1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3 
4 package nl.justobjects.pushlet.core;
5 
6 /**
7  * FIFO queue with guarded suspension.
8  * <b>Purpose</b><br>
9  * <p/>
10 * <b>Implementation</b><br>
11 * FIFO queue class implemented with circular array. The enQueue() and
12 * deQueue() methods use guarded suspension according to a readers/writers
13 * pattern, implemented with java.lang.Object.wait()/notify().
14 * <p/>
15 * <b>Examples</b><br>
16 * <p/>
17 * <br>
18 *
19 * @author Just van den Broecke - Just Objects &copy;
20 * @version $Id: EventQueue.java,v 1.3 2007/11/23 14:33:07 justb Exp $
21 */
22public class EventQueue {
23    /**
24     * Defines maximum queue size
25     */
26    private int capacity = 8;
27    private Event[] queue = null;
28    private int front, rear;
29
30    /**
31     * Construct queue with default (8) capacity.
32     */
33    public EventQueue() {
34        this(8);
35    }
36
37    /**
38     * Construct queue with specified capacity.
39     */
40    public EventQueue(int capacity) {
41        this.capacity = capacity;
42        queue = new Event[capacity];
43        front = rear = 0;
44    }
45
46    /**
47     * Put item in queue; waits() indefinitely if queue is full.
48     */
49    public synchronized boolean enQueue(Event item) throws InterruptedException {
50        return enQueue(item, -1);
51    }
52
53    /**
54     * Put item in queue; if full wait maxtime.
55     */
56    public synchronized boolean enQueue(Event item, long maxWaitTime) throws InterruptedException {
57
58        // Wait (optional maxtime) as long as the queue is full
59        while (isFull()) {
60            if (maxWaitTime > 0) {
61                // Wait at most maximum time
62                wait(maxWaitTime);
63
64                // Timed out or woken; if still full we
65                // had bad luck and return failure.
66                if (isFull()) {
67                    return false;
68                }
69            } else {
70                wait();
71            }
72        }
73
74        // Put item in queue
75        queue[rear] = item;
76        rear = next(rear);
77
78        // Wake up waiters; NOTE: first waiter will eat item
79        notifyAll();
80        return true;
81    }
82
83    /**
84     * Get head; if empty wait until something in queue.
85     */
86    public synchronized Event deQueue() throws InterruptedException {
87        return deQueue(-1);
88    }
89
90    /**
91     * Get head; if empty wait for specified time at max.
92     */
93    public synchronized Event deQueue(long maxWaitTime) throws InterruptedException {
94        while (isEmpty()) {
95            if (maxWaitTime >= 0) {
96                wait(maxWaitTime);
97
98                // Timed out or woken; if still empty we
99                // had bad luck and return failure.
00                if (isEmpty()) {
01                    return null;
02                }
03            } else {
04                // Wait indefinitely for something in queue.
05                wait();
06            }
07        }
08
09        // Dequeue item
10        Event result = fetchNext();
11
12        // Notify possible wait()-ing enQueue()-ers
13        notifyAll();
14
15        // Return dequeued item
16        return result;
17    }
18
19    /**
20     * Get all queued Events.
21     */
22    public synchronized Event[] deQueueAll(long maxWaitTime) throws InterruptedException {
23        while (isEmpty()) {
24            if (maxWaitTime >= 0) {
25                wait(maxWaitTime);
26
27                // Timed out or woken; if still empty we
28                // had bad luck and return failure.
29                if (isEmpty()) {
30                    return null;
31                }
32            } else {
33                // Wait indefinitely for something in queue.
34                wait();
35            }
36        }
37
38        // Dequeue all items item
39        Event[] events = new Event[getSize()];
40        for (int i = 0; i < events.length; i++) {
41            events[i] = fetchNext();
42        }
43
44        // Notify possible wait()-ing enQueue()-ers
45        notifyAll();
46
47        // Return dequeued item
48        return events;
49    }
50
51    public synchronized int getSize() {
52        return (rear >= front) ? (rear - front) : (capacity - front + rear);
53    }
54
55    /**
56     * Is the queue empty ?
57     */
58    public synchronized boolean isEmpty() {
59        return front == rear;
60    }
61
62    /**
63     * Is the queue full ?
64     */
65    public synchronized boolean isFull() {
66        return (next(rear) == front);
67    }
68
69    /**
70     * Circular counter.
71     */
72    private int next(int index) {
73        return (index + 1 < capacity ? index + 1 : 0);
74    }
75
76    /**
77     * Circular counter.
78     */
79    private Event fetchNext() {
80        Event temp = queue[front];
81        queue[front] = null;
82        front = next(front);
83        return temp;
84    }
85
86    public static void p(String s) {
87        System.out.println(s);
88    }
89
90    public static void main(String[] args) {
91        EventQueue q = new EventQueue(8);
92        Event event = new Event("t");
93        try {
94            q.enQueue(event);
95            p("(1) size = " + q.getSize());
96            q.enQueue(event);
97            p("(2) size = " + q.getSize());
98            q.deQueue();
99            p("(1) size = " + q.getSize());
00            q.deQueue();
01            p("(0) size = " + q.getSize());
02
03            q.enQueue(event);
04            q.enQueue(event);
05            q.enQueue(event);
06            p("(3) size = " + q.getSize());
07            q.deQueue();
08            p("(2) size = " + q.getSize());
09            q.enQueue(event);
10            q.enQueue(event);
11            q.enQueue(event);
12            p("(5) size = " + q.getSize());
13            q.enQueue(event);
14            q.enQueue(event);
15            p("(7) size = " + q.getSize());
16            q.deQueue();
17            q.deQueue();
18            q.deQueue();
19            p("(4) size = " + q.getSize());
20            q.deQueue();
21            q.deQueue();
22            q.deQueue();
23            ;
24            q.deQueue();
25            p("(0) size = " + q.getSize());
26
27            q.enQueue(event);
28            q.enQueue(event);
29            q.enQueue(event);
30            q.enQueue(event);
31            q.enQueue(event);
32            p("(5) size = " + q.getSize());
33
34            q.deQueue();
35            q.deQueue();
36            q.deQueue();
37            ;
38            q.deQueue();
39            p("(1) size = " + q.getSize());
40        } catch (InterruptedException ie) {
41        }
42    }
43}
44
45/*
46* $Log: EventQueue.java,v $
47* Revision 1.3  2007/11/23 14:33:07  justb
48* core classes now configurable through factory
49*
50* Revision 1.2  2005/02/21 11:50:46  justb
51* ohase1 of refactoring Subscriber into Session/Controller/Subscriber
52*
53* Revision 1.1  2005/02/18 10:07:23  justb
54* many renamings of classes (make names compact)
55*
56* Revision 1.6  2005/02/16 12:16:16  justb
57* added support for "poll" mode
58*
59* Revision 1.5  2005/01/13 14:47:15  justb
60* control evt: send response on same (control) connection
61*
62* Revision 1.4  2004/09/03 22:35:37  justb
63* Almost complete rewrite, just checking in now
64*
65* Revision 1.3  2003/08/15 08:37:40  justb
66* fix/add Copyright+LGPL file headers and footers
67*
68*
69*/
70