java - Doubts in code of multi-threading using ArrayBlockingQueue and mutex -
i trying write multithreaded code. can't understand can start. head banging also. please me.
my task is,
- there 1 queue of length 1, known
pending_tasks
, contains tasks requires processing. - there queue of length 1, known
completed_tasks
, contains tasks completes processing., , ready deliver.
my implementation thinking,
- firstly make 2 blocking queues,
pending_tasks
,completed_tasks
. - one thread(producer) listening tasks comes outside, if gets put
pending_tasks
. - one thread(consumer) ready take tasks
pending_tasks
, starts processing , , after put intocompleted_tasks
. - then again comes
pending_tasks
, , whenever tasks come, start same processing. - basically, single producer-single consumer problem.
my confusion,
i know can code using arrayblockingqueue , mutex. didn't understand how can start this. have understanding of mutex, read mutex link, , have understanding of blockingqueue also, read lots of questions on site.
can please give me implementation guidance, can write multi-threaded code.
i wrote code same, not achieve final goal of task.
thanks in advance. looking kind reply.
edit no. 1
please see below code. code works fine, code has 1 functionality missing. please me add that, give guidance that.
functionality is,
- when producer thread puts value in pending_task queue, waits time there. if in time consumer gives result consumer, ok. otherwise, says time out, , producer takes value , pput in pending_task queue, , same process starts.
please me in adding above functionality. think have communicate between producer thread , consumer thread, , thread communication done using mutex(i think). please me implementing same
my code,
multithread class
package multithread; import java.util.concurrent.arrayblockingqueue; import java.util.concurrent.blockingqueue; public class multithread { public static blockingqueue<integer> pending_task; public static blockingqueue<integer> completed_task; public multithread(int length) { pending_task = new arrayblockingqueue<integer>(length, true); completed_task = new arrayblockingqueue<integer>(length, true); } }
producer class
package multithread; import java.util.logging.level; import java.util.logging.logger; public class producer implements runnable { @override public void run() { (int = 0; < 10; i++) { try { system.out.println("producer: try put value " + + " in pending queue"); multithread.pending_task.put(i); system.out.println("producer: put value " + + " in pending queue, turn consumer"); } catch (interruptedexception ex) { logger.getlogger(producer.class.getname()).log(level.severe, null, ex); } } } }
consumer class
package multithread; import java.util.logging.level; import java.util.logging.logger; public class consumer implements runnable { @override public void run() { (int = 0; < 10; i++) { try { system.out.println("consumer: try take value pending queue"); int val = multithread.pending_task.take(); system.out.println("consumer: take value, , " + val); system.out.println("consumer: processing starts"); thread.sleep(1000); system.out.println("consumer: processing ends"); system.out.println("consumer: try put value in completed queue, , value " + val); multithread.completed_task.put(val); system.out.println("consumer: put completed queue"); //serve value corresponding user } catch (interruptedexception ex) { logger.getlogger(consumer.class.getname()).log(level.severe, null, ex); } } } }
deliveryboy class
package multithread; import java.util.logging.level; import java.util.logging.logger; public class deliveryboy implements runnable { @override public void run() { (int = 0; < 10; i++) { try { system.out.println("deliveryboy: waiting value near completed queue"); int val = multithread.completed_task.take(); system.out.println("deliveryboy: succesfully take value completed queue , vlue " + val); //serve value corresponding user } catch (interruptedexception ex) { logger.getlogger(consumer.class.getname()).log(level.severe, null, ex); } } } }
test class
package multithread; public class test { public static void main(string[] args) { // todo code application logic here multithread ml = new multithread(1); new thread(new producer()).start(); new thread(new consumer()).start(); new thread(new deliveryboy()).start(); } }
public void put(e e) throws interruptedexception
inserts specified element @ tail of queue, waiting **space become available if queue full
public e take() throws interruptedexception
description copied interface: blockingqueue retrieves , removes head of queue, waiting if necessary until element becomes available.
so need call these methods threads.
try (study javadoc) , when have more specific problem can ask again.
Comments
Post a Comment