-
Producer Consumer Thread
Hello all,
i've got a problem can't resolve it, hope someone can help.
In few words i have a shared buffer, where both producer and consumer, put
and retreive items.
now say i have 4 producer thread producing 5 items each, and 3 consumer taking
those items out of the buffer.
now the prob is i don't know how to tell the consumer to stop retrieving
these items when all producer are done, it just waits forever.
bare in mind that comsumer doesn't know how many item are produced by the
procucer threads, the only thing they have in common, is the buffer.
i have tried few methods( i.e check wehn the buffer is empty, but this is
no working, coz it turns out to be true when a consumer takes the item as
soon as it's been produce) or is there a way i can put some sort of flag
in the buffer to say that the last has been retreived.
hope it makes sense,i'm just too confuse myself,
regards,
Chirag
-
Re: Producer Consumer Thread
You would not put that flag in the buffer itself, you would put it in the
object that contains the buffer as a separate variable. And you would set
it to true to mean "There will never be any more data added to the buffer",
right?
PC2
"Chirag" <chiragvaland@hotmail.com> wrote in message
news:3cadb43f$1@10.1.10.29...
>
> Hello all,
>
> i've got a problem can't resolve it, hope someone can help.
> In few words i have a shared buffer, where both producer and consumer, put
> and retreive items.
> now say i have 4 producer thread producing 5 items each, and 3 consumer
taking
> those items out of the buffer.
> now the prob is i don't know how to tell the consumer to stop retrieving
> these items when all producer are done, it just waits forever.
> bare in mind that comsumer doesn't know how many item are produced by the
> procucer threads, the only thing they have in common, is the buffer.
> i have tried few methods( i.e check wehn the buffer is empty, but this is
> no working, coz it turns out to be true when a consumer takes the item as
> soon as it's been produce) or is there a way i can put some sort of flag
> in the buffer to say that the last has been retreived.
>
> hope it makes sense,i'm just too confuse myself,
>
> regards,
> Chirag
-
Re: Producer Consumer Thread
Wouldn't this only work if u had 1 producer and 1 consumer? i tried to implement
it the following way, i have a flag inside my producer thread which is set
to true when it has finnish putting items in the buffer, and i also have
a method that returns the status of this flag. Now on the cosumer side, i
pass this flag as an argument to it's constuctor, also i loop to retrieve
items until the flag is set to true. Now the main prog who runs these threads
would ideally have a variable to store in it the status of the flag inside
producer thread and pass that to the consumer as an argument( so eveytime
it loops to retreive items until flag is set to true)
well this is what i understood from your explanation. If u think it's wrong
could you please let us know what is the right way of doing this,
Regads,
Chirag
"Paul Clapham" <pclapham@core-mark.com> wrote:
>You would not put that flag in the buffer itself, you would put it in the
>object that contains the buffer as a separate variable. And you would set
>it to true to mean "There will never be any more data added to the buffer",
>right?
>
>PC2
>
>"Chirag" <chiragvaland@hotmail.com> wrote in message
>news:3cadb43f$1@10.1.10.29...
>>
>> Hello all,
>>
>> i've got a problem can't resolve it, hope someone can help.
>> In few words i have a shared buffer, where both producer and consumer,
put
>> and retreive items.
>> now say i have 4 producer thread producing 5 items each, and 3 consumer
>taking
>> those items out of the buffer.
>> now the prob is i don't know how to tell the consumer to stop retrieving
>> these items when all producer are done, it just waits forever.
>> bare in mind that comsumer doesn't know how many item are produced by
the
>> procucer threads, the only thing they have in common, is the buffer.
>> i have tried few methods( i.e check wehn the buffer is empty, but this
is
>> no working, coz it turns out to be true when a consumer takes the item
as
>> soon as it's been produce) or is there a way i can put some sort of flag
>> in the buffer to say that the last has been retreived.
>>
>> hope it makes sense,i'm just too confuse myself,
>>
>> regards,
>> Chirag
>
>
-
Re: Producer Consumer Thread
No, you have to have a **single** flag that is set when **all** producers
have finished. Not one flag for each producer. If the producers are
independent then I don't know how you could have them set such a flag. How
do you know when they have all finished?
PC2
"Chirag" <chiragvaland@hotmail.com> wrote in message
news:3cadfeda$1@10.1.10.29...
>
> Wouldn't this only work if u had 1 producer and 1 consumer? i tried to
implement
> it the following way, i have a flag inside my producer thread which is set
> to true when it has finnish putting items in the buffer, and i also have
> a method that returns the status of this flag. Now on the cosumer side, i
> pass this flag as an argument to it's constuctor, also i loop to retrieve
> items until the flag is set to true. Now the main prog who runs these
threads
> would ideally have a variable to store in it the status of the flag inside
> producer thread and pass that to the consumer as an argument( so eveytime
> it loops to retreive items until flag is set to true)
> well this is what i understood from your explanation. If u think it's
wrong
> could you please let us know what is the right way of doing this,
>
> Regads,
> Chirag
-
Re: Producer Consumer Thread
One way to solve this is to ensure the object containing the buffer (BufferObject)
also has a reference to ALL producer threads. The producer threads will need
a method to indicate whether they will produce anymore. The BufferObject
can then contain a method which checks whether all producer threads have
stopped producing.
The question is how do you ensure that every producer must register itself
with the BufferObject? One way is to put a method in BufferObject:
Buffer requestBuffer(Producer who)
which will return the buffer if the producer is valid otherwise null. If
the producer needs to be added to the internal set of producers it can be.
Hope this helps,
Kent
"Paul Clapham" <pclapham@core-mark.com> wrote:
>No, you have to have a **single** flag that is set when **all** producers
>have finished. Not one flag for each producer. If the producers are
>independent then I don't know how you could have them set such a flag.
How
>do you know when they have all finished?
>
>PC2
>
>"Chirag" <chiragvaland@hotmail.com> wrote in message
>news:3cadfeda$1@10.1.10.29...
>>
>> Wouldn't this only work if u had 1 producer and 1 consumer? i tried to
>implement
>> it the following way, i have a flag inside my producer thread which is
set
>> to true when it has finnish putting items in the buffer, and i also have
>> a method that returns the status of this flag. Now on the cosumer side,
i
>> pass this flag as an argument to it's constuctor, also i loop to retrieve
>> items until the flag is set to true. Now the main prog who runs these
>threads
>> would ideally have a variable to store in it the status of the flag inside
>> producer thread and pass that to the consumer as an argument( so eveytime
>> it loops to retreive items until flag is set to true)
>> well this is what i understood from your explanation. If u think it's
>wrong
>> could you please let us know what is the right way of doing this,
>>
>> Regads,
>> Chirag
>
>
>
-
Re: Producer Consumer Thread
Another way would be to have the producer thread increment a counter in the
BufferObject when they register, and to decrement that counter when they
stop (so they must de-register too). Then when that counter becomes zero,
there are no more producers producing.
PC2
"Kent" <kb@essential.com.au> wrote in message news:3cb256ae$1@10.1.10.29...
>
> One way to solve this is to ensure the object containing the buffer
(BufferObject)
> also has a reference to ALL producer threads. The producer threads will
need
> a method to indicate whether they will produce anymore. The BufferObject
> can then contain a method which checks whether all producer threads have
> stopped producing.
>
> The question is how do you ensure that every producer must register itself
> with the BufferObject? One way is to put a method in BufferObject:
>
> Buffer requestBuffer(Producer who)
>
> which will return the buffer if the producer is valid otherwise null. If
> the producer needs to be added to the internal set of producers it can be.
>
> Hope this helps,
> Kent
-
Re: Producer Consumer Thread
Dear friends,
Thanks for sharing your thoughts, i have ried to implement ur ideas(both
combined) i think i have started to get eh logic right but still have difficulties,
the problem occuring now is that the consumer is only retrieving one item
from the buffer and then stops. I think i'll be better of giving you the
listing of my program because it's becomming a bit hard to explain what is
happening here!!!
Best Regards,
Chirag
public interface Counter
{
public final int INIT_VALUE = 0; // final constant variables
public final int FINAL_VALUE = 5;
public int counter = 0;
public void set (int value);
public int value ();
public int getFirstValue ();
public int getLastValue ();
public void next ();
}
///////////////////////////////////////////////////////////////////
public class ModuloCounter implements Counter
{
private int firstValue;
private int lastValue;
private int counter;
ModuloCounter ()
{
this ( INIT_VALUE, FINAL_VALUE );
}
ModuloCounter ( int fValue)
{
this ( INIT_VALUE ,fValue );
}
ModuloCounter ( int fValue, int lValue )
{
firstValue = fValue;
lastValue = lValue;
}
public void set ( int value )
{
if(value >= firstValue && value < lastValue)
{
System.out.println ( "\nCounter set to " + value + "\n" );
counter = value;
}
else
System.out.println ( "\nCounter set to " + value + " Error!! Out of
Range\n" );
}
public void next ()
{
++counter;
counter = counter % lastValue;
}
public int value ()
{
return counter;
}
public int getFirstValue ()
{
return firstValue;
}
public int getLastValue ()
{
return lastValue;
}
}
//////////////////////////////////////////////////////////////////////
public interface Buffer
{
public final int BUFFER_SIZE = 8;
public void put( Object itemBuffered );
public Object get();
public boolean isEmpty ();
public boolean isFull();
}
//////////////////////////////////////////////////////////////////////
public class CircularBuffer implements Buffer
{
private ModuloCounter in;
private ModuloCounter out;
private int counter;
public Object circularBuffer[];
private int bufferSize ;
private Object sharedObject = null;
public CircularBuffer()
{
this( BUFFER_SIZE ) ;
}
public CircularBuffer( int b )
{
counter = 0;
bufferSize = b ;
in = new ModuloCounter( bufferSize );
out = new ModuloCounter( bufferSize ) ;
circularBuffer = new Object[ bufferSize ];
//initialise buffer to +
for (int i = 0; i < bufferSize ; i++)
{
circularBuffer[i] = null;
}
}
public boolean isFull()
{
if ( counter == bufferSize )
return true;
else
return false;
}
public boolean isEmpty()
{
if ( counter == 0 )
return true;
else
return false;
}
public void put (Object itemBuffer)
{
if ( !isFull() )
{
sharedObject = itemBuffer;
circularBuffer[in.value()] = itemBuffer;
in.next();
counter++;
}
}
public Object get()
{
if (!isEmpty())
{
circularBuffer[out.value()] = null;
out.next();
counter--;
}
return sharedObject;
}
public int getBufferSize()
{
return bufferSize;
}
public String toString()
{
String output = "";
String bufferContent = "";
for (int i = 0; i < bufferSize; i++ )
{
if (circularBuffer[ i ] == null)
{
bufferContent += " _";
}
else
{
bufferContent += " *";
}
}
output += "[COUNT = " + counter + " | IN = " + (in.value()+1)
+ " | OUT = " + (out.value()+1) + " | <" + bufferContent + " >] ";
return output;
}
}
//////////////////////////////////////////////////////////////////////
public class SharedBuffer extends CircularBuffer
{
private Object sharedObject = null;
private boolean writeable = true;
private boolean readable = false;
private int pCounter = 0;
private int pItems = 0;
private boolean StopProducing;
public SharedBuffer(int b)
{
super(b);
}
// put an item in the buffer
public synchronized void put (Object itemBuffer, int i, boolean sp)
{
while ( !writeable ) {
try {
System.out.println(" Waiting to Produce ");
wait();
}
catch ( InterruptedException e ) {
System.out.println( e.toString() );
}
}
sharedObject = itemBuffer;
pItems = i;
StopProducing = sp;
super.put (sharedObject);
pCounter++;
readable = true;
System.out.println("SharedBuffer.Put: " + super.toString() + "\n");
if ( isFull() )
{
writeable = false;
System.out.println( "Buffer Full" );
}
notifyAll();
}
// remove an item from buffer
public synchronized Object get()
{
while ( !readable ) {
try {
System.out.println( "Waiting to consume" );
wait();
}
catch ( InterruptedException e ) {
System.err.println(e.toString() );
}
}
writeable = true;
super.get();
if(StopProducing)
{
pCounter--;
}
else
{
pCounter = pCounter - pItems;
}
System.out.println("SharedBuffer.get: " + super.toString());
if ( isEmpty() )
{
readable = false;
System.out.println( "Buffer empty" );
}
notifyAll();
return sharedObject;
}
public int getPCounter()
{
return pCounter;
}
}
////////////////////////////////////////////////////////////////////////
import java.util.Calendar;
public class TimeStampServer extends Thread
{
// Create a TimeStamp
Calendar now = Calendar.getInstance();
public void run()
{
setDaemon( true );
while (true) {}
}
public String getTime()
{
String timestamp = new String( now.get(Calendar.HOUR_OF_DAY) + ":" +
now.get(Calendar.MINUTE) + ":" +
now.get(Calendar.SECOND) + ":" +
now.get(Calendar.MILLISECOND));
return timestamp;
}
}
/////////////////////////////////////////////////////////////////////
public class ThreadDescriptor
{
private String id;
private String group;
private int pri;
private String time;
public ThreadDescriptor(Thread td, String t, ThreadGroup grp)
{
id = td.getName();
group = grp.getName();
pri = grp.getMaxPriority();
time = t;
}
public String toString()
{
String output = "";
output += "[ID = " + id + " | GRP = " + group + " | PRI = " + pri
+ " | TIME = " + time + "]";
return output;
}
}
///////////////////////////////////////////////////////////////////////
public class Producer extends Thread
{
private SharedBuffer pSharedBuffer;
private int pItems;
private TimeStampServer pTimeStampServer;
private int seconds;
private ThreadGroup pThreadGroup;
private boolean StopProducing = false;
public Producer(String name, SharedBuffer sb, int items, TimeStampServer
tss, int sec, ThreadGroup tg)
{
super(name);
pSharedBuffer = sb;
pItems = items;
pTimeStampServer = tss;
seconds = sec;
pThreadGroup = tg;
}
// activate the thread
public void run()
{
System.out.println(getName() + " Thread Started\n" );
for (int i = 0; i < pItems; i++ )
{
try {
Thread.sleep( (int) ( Math.random()* (seconds * 1000) ) );
}
catch ( InterruptedException e ) {
System.err.println( e.toString() );
}
ThreadDescriptor td = new ThreadDescriptor( this, pTimeStampServer.getTime(),
pThreadGroup );
System.out.println(getName()+ ".put: " + td.toString());
pSharedBuffer.put(td, pItems, getStopProducing());
}
System.out.println( getName() + " produced " + pItems + " items \n");
StopProducing = true;
}
public boolean getStopProducing()
{
return StopProducing;
}
}
/////////////////////////////////////////////////////////////////////
class Consumer extends Thread
{
private SharedBuffer cSharedBuffer;
private ThreadGroup cThreadGroup;
private int cItems = 0;
public Consumer(String str, SharedBuffer sb, ThreadGroup tg )
{
super(str);
cSharedBuffer = sb;
cThreadGroup = tg;
}
public void run()
{
System.out.println(getName() + " Thread Started\n" );
while (cSharedBuffer.getPCounter() == 0)
{
try {
Thread.sleep( (int) ( Math.random()* 5000 ) );
}
catch ( InterruptedException e ) {
System.err.println( e.toString() );
}
System.out.println(getName()+ ".get: " + cSharedBuffer.get() + "\n");
yield();
cItems++;
}
System.out.println( getName() + " retrieved " + cItems + " items \n");
}
}
//////////////////////////////////////////////////////////////////////
public class ThreadManager
{
public static void main(String args[])
{
SharedBuffer sb = new SharedBuffer(5);
TimeStampServer tss = new TimeStampServer();
ThreadGroup groupProd1 = new ThreadGroup("ProdHigh");
ThreadGroup groupProd2 = new ThreadGroup("ProdLow");
groupProd1.setMaxPriority(8);
groupProd2.setMaxPriority(3);
Producer prod1 = new Producer( "Prod1", sb, 5, tss, 3, groupProd1 );
Producer prod2 = new Producer( "Prod2", sb, 5, tss, 2, groupProd1 );
Producer prod3 = new Producer( "Prod3", sb, 5, tss, 3, groupProd2 );
Producer prod4 = new Producer( "Prod4", sb, 5, tss, 1, groupProd2 );
ThreadGroup groupCons = new ThreadGroup("Consumer");
Consumer cons1 = new Consumer( "Consumer1", sb, groupCons );
Consumer cons2 = new Consumer( "Consumer2", sb, groupCons );
Consumer cons3 = new Consumer( "Consumer3", sb, groupCons );
prod1.start();
prod2.start();
prod3.start();
prod4.start();
cons1.start();
cons2.start();
cons3.start();
}
}
/////////////////////////////////////////////////////////////////////
Hope you can all read this if anything let me know where are lost and might
be able to tell you a bit more about it....
"Paul Clapham" <pclapham@core-mark.com> wrote:
>Another way would be to have the producer thread increment a counter in
the
>BufferObject when they register, and to decrement that counter when they
>stop (so they must de-register too). Then when that counter becomes zero,
>there are no more producers producing.
>
>PC2
>
>"Kent" <kb@essential.com.au> wrote in message news:3cb256ae$1@10.1.10.29...
>>
>> One way to solve this is to ensure the object containing the buffer
>(BufferObject)
>> also has a reference to ALL producer threads. The producer threads will
>need
>> a method to indicate whether they will produce anymore. The BufferObject
>> can then contain a method which checks whether all producer threads have
>> stopped producing.
>>
>> The question is how do you ensure that every producer must register itself
>> with the BufferObject? One way is to put a method in BufferObject:
>>
>> Buffer requestBuffer(Producer who)
>>
>> which will return the buffer if the producer is valid otherwise null.
If
>> the producer needs to be added to the internal set of producers it can
be.
>>
>> Hope this helps,
>> Kent
>
>
>
Posting Permissions
- You may not post new threads
- You may not post replies
- You may not post attachments
- You may not edit your posts
Forum Rules
|
Top DevX Stories
Easy Web Services with SQL Server 2005 HTTP Endpoints
JavaOne 2005: Java Platform Roadmap Focuses on Ease of Development, Sun Focuses on the "Free" in F.O.S.S.
Wed Yourself to UML with the Power of Associations
Microsoft to Add AJAX Capabilities to ASP.NET
IBM's Cloudscape Versus MySQL
|
Bookmarks