1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3 
4 package nl.justobjects.pushlet.core;
5 
6 import nl.justobjects.pushlet.util.PushletException;
7 import nl.justobjects.pushlet.util.Rand;
8 import nl.justobjects.pushlet.util.Sys;
9 
10import java.util.Collections;
11import java.util.HashMap;
12import java.util.Map;
13import java.net.URLEncoder;
14
15/**
16 * Handles data channel between dispatcher and client.
17 *
18 * @author Just van den Broecke - Just Objects &copy;
19 * @version $Id: Subscriber.java,v 1.26 2007/11/23 14:33:07 justb Exp $
20 */
21public class Subscriber implements Protocol, ConfigDefs {
22    private Session session;
23
24    /**
25     * Blocking queue.
26     */
27    private EventQueue eventQueue = new EventQueue(Config.getIntProperty(QUEUE_SIZE));
28
29    /**
30     * URL to be used in refresh requests in pull/poll modes.
31     */
32    private long queueReadTimeoutMillis = Config.getLongProperty(QUEUE_READ_TIMEOUT_MILLIS);
33    private long queueWriteTimeoutMillis = Config.getLongProperty(QUEUE_WRITE_TIMEOUT_MILLIS);
34    private long refreshTimeoutMillis = Config.getLongProperty(PULL_REFRESH_TIMEOUT_MILLIS);
35    volatile long lastAlive = Sys.now();
36
37    /**
38     * Map of active subscriptions, keyed by their subscription id.
39     */
40    private Map subscriptions = Collections.synchronizedMap(new HashMap(3));
41
42    /**
43     * Are we able to accept/send events ?.
44     */
45    private volatile boolean active;
46
47    /**
48     * Transfer mode (stream, pull, poll).
49     */
50    private String mode;
51
52
53    /**
54     * Protected constructor as we create through factory method.
55     */
56    protected Subscriber() {
57    }
58
59    /**
60     * Create instance through factory method.
61     *
62     * @param aSession the parent Session
63     * @return a Subscriber object (or derived)
64     * @throws PushletException exception, usually misconfiguration
65     */
66    public static Subscriber create(Session aSession) throws PushletException {
67        Subscriber subscriber;
68        try {
69            subscriber = (Subscriber) Config.getClass(SUBSCRIBER_CLASS, "nl.justobjects.pushlet.core.Subscriber").newInstance();
70        } catch (Throwable t) {
71            throw new PushletException("Cannot instantiate Subscriber from config", t);
72        }
73
74        subscriber.session = aSession;
75        return subscriber;
76    }
77
78    public void start() {
79        active = true;
80    }
81
82    public void stop() {
83        removeSubscriptions();
84        active = false;
85    }
86
87    public void bailout() {
88        session.stop();
89    }
90
91    /**
92     * Are we still active to handle events.
93     */
94    public boolean isActive() {
95        return active;
96    }
97
98    /**
99     * Return client session.
00     */
01    public Session getSession() {
02        return session;
03    }
04
05    /**
06     * Get (session) id.
07     */
08    public String getId() {
09        return session.getId();
10    }
11
12    /**
13     * Return subscriptions.
14     */
15    public Subscription[] getSubscriptions() {
16        // todo: Optimize
17        return (Subscription[]) subscriptions.values().toArray(new Subscription[0]);
18    }
19
20    /**
21     * Add a subscription.
22     */
23    public Subscription addSubscription(String aSubject, String aLabel) throws PushletException {
24        Subscription subscription = Subscription.create(aSubject, aLabel);
25        subscriptions.put(subscription.getId(), subscription);
26        info("Subscription added subject=" + aSubject + " sid=" + subscription.getId() + " label=" + aLabel);
27        return subscription;
28    }
29
30    /**
31     * Remove a subscription.
32     */
33    public Subscription removeSubscription(String aSubscriptionId) {
34        Subscription subscription = (Subscription) subscriptions.remove(aSubscriptionId);
35        if (subscription == null) {
36            warn("No subscription found sid=" + aSubscriptionId);
37            return null;
38        }
39        info("Subscription removed subject=" + subscription.getSubject() + " sid=" + subscription.getId() + " label=" + subscription.getLabel());
40        return subscription;
41    }
42
43    /**
44     * Remove all subscriptions.
45     */
46    public void removeSubscriptions() {
47        subscriptions.clear();
48    }
49
50    public String getMode() {
51        return mode;
52    }
53
54    public void setMode(String aMode) {
55        mode = aMode;
56    }
57
58    public long getRefreshTimeMillis() {
59        String minWaitProperty = PULL_REFRESH_WAIT_MIN_MILLIS;
60        String maxWaitProperty = PULL_REFRESH_WAIT_MAX_MILLIS;
61        if (mode.equals((MODE_POLL))) {
62            minWaitProperty = POLL_REFRESH_WAIT_MIN_MILLIS;
63            maxWaitProperty = POLL_REFRESH_WAIT_MAX_MILLIS;
64
65        }
66        return Rand.randomLong(Config.getLongProperty(minWaitProperty),
67                Config.getLongProperty(maxWaitProperty));
68    }
69
70    /**
71     * Get events from queue and push to client.
72     */
73    public void fetchEvents(Command aCommand) throws PushletException {
74
75        String refreshURL = aCommand.httpReq.getRequestURI() + "?" + P_ID + "=" + session.getId() + "&" + P_EVENT + "=" + E_REFRESH;
76
77        // This is the only thing required to support "poll" mode
78        if (mode.equals(MODE_POLL)) {
79            queueReadTimeoutMillis = 0;
80            refreshTimeoutMillis = Config.getLongProperty(POLL_REFRESH_TIMEOUT_MILLIS);
81        }
82
83        // Required for fast bailout (tomcat)
84        aCommand.httpRsp.setBufferSize(128);
85
86        // Try to prevent caching in any form.
87        aCommand.sendResponseHeaders();
88
89        // Let clientAdapter determine how to send event
90        ClientAdapter clientAdapter = aCommand.getClientAdapter();
91        Event responseEvent = aCommand.getResponseEvent();
92        try {
93            clientAdapter.start();
94
95            // Send first event (usually hb-ack or listen-ack)
96            clientAdapter.push(responseEvent);
97
98            // In pull/poll mode and when response is listen-ack or join-listen-ack,
99            // return and force refresh immediately
00            // such that the client recieves response immediately over this channel.
01            // This is usually when loading the browser app for the first time
02            if ((mode.equals(MODE_POLL) || mode.equals(MODE_PULL))
03                    && responseEvent.getEventType().endsWith(Protocol.E_LISTEN_ACK)) {
04                sendRefresh(clientAdapter, refreshURL);
05
06                // We should come back later with refresh event...
07                return;
08            }
09        } catch (Throwable t) {
10            bailout();
11            return;
12        }
13
14
15        Event[] events = null;
16
17        // Main loop: as long as connected, get events and push to client
18        long eventSeqNr = 1;
19        while (isActive()) {
20            // Indicate we are still alive
21            lastAlive = Sys.now();
22
23            // Update session time to live
24            session.kick();
25
26            // Get next events; blocks until timeout or entire contents
27            // of event queue is returned. Note that "poll" mode
28            // will return immediately when queue is empty.
29            try {
30                // Put heartbeat in queue when starting to listen in stream mode
31                // This speeds up the return of *_LISTEN_ACK
32                if (mode.equals(MODE_STREAM) && eventSeqNr == 1) {
33                    eventQueue.enQueue(new Event(E_HEARTBEAT));
34                }
35
36                events = eventQueue.deQueueAll(queueReadTimeoutMillis);
37            } catch (InterruptedException ie) {
38                warn("interrupted");
39                bailout();
40            }
41
42            // Send heartbeat when no events received
43            if (events == null) {
44                events = new Event[1];
45                events[0] = new Event(E_HEARTBEAT);
46            }
47
48            // ASSERT: one or more events available
49
50            // Send events to client using adapter
51            // debug("received event count=" + events.length);
52            for (int i = 0; i < events.length; i++) {
53                // Check for abort event
54                if (events[i].getEventType().equals(E_ABORT)) {
55                    warn("Aborting Subscriber");
56                    bailout();
57                }
58
59                // Push next Event to client
60                try {
61                    // Set sequence number
62                    events[i].setField(P_SEQ, eventSeqNr++);
63
64                    // Push to client through client adapter
65                    clientAdapter.push(events[i]);
66                } catch (Throwable t) {
67                    bailout();
68                    return;
69                }
70            }
71
72            // Force client refresh request in pull or poll modes
73            if (mode.equals(MODE_PULL) || mode.equals(MODE_POLL)) {
74                sendRefresh(clientAdapter, refreshURL);
75
76                // Always leave loop in pull/poll mode
77                break;
78            }
79        }
80    }
81
82    /**
83     * Determine if we should receive event.
84     */
85    public Subscription match(Event event) {
86        Subscription[] subscriptions = getSubscriptions();
87        for (int i = 0; i < subscriptions.length; i++) {
88            if (subscriptions[i].match(event)) {
89                return subscriptions[i];
90            }
91        }
92        return null;
93    }
94
95    /**
96     * Event from Dispatcher: enqueue it.
97     */
98    public void onEvent(Event theEvent) {
99        if (!isActive()) {
00            return;
01        }
02
03        // p("send: queue event: "+theEvent.getSubject());
04
05        // Check if we had any active continuation for at
06        // least 'timeOut' millisecs. If the client has left this
07        // instance there would be no way of knowing otherwise.
08        long now = Sys.now();
09        if (now - lastAlive > refreshTimeoutMillis) {
10            warn("not alive for at least: " + refreshTimeoutMillis + "ms, leaving...");
11            bailout();
12            return;
13        }
14
15        // Put event in queue; leave if queue full
16        try {
17            if (!eventQueue.enQueue(theEvent, queueWriteTimeoutMillis)) {
18                warn("queue full, bailing out...");
19                bailout();
20            }
21
22            // ASSERTION : Event in queue.
23            // see fetchEvents() where Events are dequeued and pushed to the client.
24        } catch (InterruptedException ie) {
25            bailout();
26        }
27
28    }
29
30    /**
31     * Send refresh command to pull/poll clients.
32     */
33    protected void sendRefresh(ClientAdapter aClientAdapter, String aRefreshURL) {
34        Event refreshEvent = new Event(E_REFRESH);
35
36        // Set wait time and url for refresh
37        refreshEvent.setField(P_WAIT, "" + getRefreshTimeMillis());
38        refreshEvent.setField(P_URL, aRefreshURL);
39
40        try {
41            // Push to client through client adapter
42            aClientAdapter.push(refreshEvent);
43
44            // Stop this round until refresh event
45            aClientAdapter.stop();
46        } catch (Throwable t) {
47            // Leave on any exception
48            bailout();
49        }
50    }
51
52    /**
53     * Info.
54     */
55    protected void info(String s) {
56        session.info("[Subscriber] " + s);
57    }
58
59    /**
60     * Exceptional print util.
61     */
62    protected void warn(String s) {
63        session.warn("[Subscriber] " + s);
64    }
65
66    /**
67     * Exceptional print util.
68     */
69    protected void debug(String s) {
70        session.debug("[Subscriber] " + s);
71    }
72
73
74    public String toString() {
75        return session.toString();
76    }
77}
78
79/*
80 * $Log: Subscriber.java,v $
81 * Revision 1.26  2007/11/23 14:33:07  justb
82 * core classes now configurable through factory
83 *
84 * Revision 1.25  2007/11/10 15:53:15  justb
85 * put heartbeat in queue when start fetching events in stream-mode
86 *
87 * Revision 1.24  2006/10/19 12:33:40  justb
88 * add atomic join-listen support (one request)
89 *
90 * Revision 1.22  2006/05/06 00:06:28  justb
91 * first rough version AJAX client
92 *
93 * Revision 1.21  2005/02/28 12:45:59  justb
94 * introduced Command class
95 *
96 * Revision 1.20  2005/02/21 16:59:09  justb
97 * SessionManager and session lease introduced
98 *
99 * Revision 1.19  2005/02/21 12:32:28  justb
00 * fixed publish event in Controller
01 *
02 * Revision 1.18  2005/02/21 11:50:46  justb
03 * ohase1 of refactoring Subscriber into Session/Controller/Subscriber
04 *
05 * Revision 1.17  2005/02/20 13:05:32  justb
06 * removed the Postlet (integrated in Pushlet protocol)
07 *
08 * Revision 1.16  2005/02/18 12:36:47  justb
09 * changes for renaming and configurability
10 *
11 * Revision 1.15  2005/02/18 10:07:23  justb
12 * many renamings of classes (make names compact)
13 *
14 * Revision 1.14  2005/02/18 09:54:15  justb
15 * refactor: rename Publisher Dispatcher and single Subscriber class
16 *
17 * Revision 1.13  2005/02/16 14:39:34  justb
18 * fixed leave handling and added "poll" mode
19 *
20 * Revision 1.12  2005/01/24 13:42:00  justb
21 * new protocol changes (p_listen)
22 *
23 * Revision 1.11  2005/01/13 14:47:15  justb
24 * control evt: send response on same (control) connection
25 *
26 * Revision 1.10  2004/10/24 20:50:35  justb
27 * refine subscription with label and sending sid and label on events
28 *
29 * Revision 1.9  2004/10/24 12:58:18  justb
30 * revised client and test classes for new protocol
31 *
32 * Revision 1.8  2004/09/26 21:39:43  justb
33 * allow multiple subscriptions and out-of-band requests
34 *
35 * Revision 1.7  2004/09/20 22:01:38  justb
36 * more changes for new protocol
37 *
38 * Revision 1.6  2004/09/03 22:35:37  justb
39 * Almost complete rewrite, just checking in now
40 *
41 * Revision 1.5  2004/08/13 23:36:05  justb
42 * rewrite of Pullet into Pushlet "pull" mode
43 *
44 * Revision 1.4  2004/03/10 14:01:55  justb
45 * formatting and *Subscriber refactoring
46 *
47 * Revision 1.3  2003/08/15 08:37:40  justb
48 * fix/add Copyright+LGPL file headers and footers
49 *
50 * Revision 1.2  2003/05/18 16:15:08  justb
51 * support for XML encoded Events
52 *
53 * Revision 1.1.1.1  2002/09/24 21:02:32  justb
54 * import to sourceforge
55 *
56 * Revision 1.1.1.1  2002/09/20 22:48:18  justb
57 * import to SF
58 *
59 * Revision 1.1.1.1  2002/09/20 14:19:04  justb
60 * first import into SF
61 *
62 * Revision 1.3  2002/04/15 20:42:41  just
63 * reformatting and renaming GuardedQueue to EventQueue
64 *
65 * Revision 1.2  2000/08/21 20:48:29  just
66 * added CVS log and id tags plus copyrights
67 *
68 *
69 */
70