1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3 
4 package nl.justobjects.pushlet.core;
5 
6 import nl.justobjects.pushlet.util.PushletException;
7 
8 import java.io.IOException;
9 
10/**
11 * Handles servlet requests from client.
12 *
13 * @author Just van den Broecke - Just Objects &copy;
14 * @version $Id: Controller.java,v 1.9 2007/11/23 14:33:07 justb Exp $
15 */
16public class Controller implements Protocol, ConfigDefs {
17
18    private Session session;
19
20    /**
21     * Protected constructor as we create through factory method.
22     */
23    protected Controller() {
24    }
25
26    /**
27     * Create instance through factory method.
28     *
29     * @param aSession the parent Session
30     * @return a Controller object (or derived)
31     * @throws PushletException exception, usually misconfiguration
32     */
33    public static Controller create(Session aSession) throws PushletException {
34        Controller controller;
35        try {
36            controller = (Controller) Config.getClass(CONTROLLER_CLASS, "nl.justobjects.pushlet.core.Controller").newInstance();
37        } catch (Throwable t) {
38            throw new PushletException("Cannot instantiate Controller from config", t);
39        }
40        controller.session = aSession;
41        return controller;
42    }
43
44    /**
45     * Handle command.
46     */
47    public void doCommand(Command aCommand) {
48        try {
49            // Update lease time to live
50            session.kick();
51
52            // Set remote IP address of client
53            session.setAddress(aCommand.httpReq.getRemoteAddr());
54
55            debug("doCommand() event=" + aCommand.reqEvent);
56
57            // Get event type
58            String eventType = aCommand.reqEvent.getEventType();
59
60            // Determine action based on event type
61            if (eventType.equals(Protocol.E_REFRESH)) {
62                // Pull/poll mode clients that refresh
63                doRefresh(aCommand);
64            } else if (eventType.equals(Protocol.E_SUBSCRIBE)) {
65                // Subscribe
66                doSubscribe(aCommand);
67            } else if (eventType.equals(Protocol.E_UNSUBSCRIBE)) {
68                // Unsubscribe
69                doUnsubscribe(aCommand);
70            } else if (eventType.equals(Protocol.E_JOIN)) {
71                // Join
72                doJoin(aCommand);
73            } else if (eventType.equals(Protocol.E_JOIN_LISTEN)) {
74                // Join and listen (for simple and e.g. REST apps)
75                doJoinListen(aCommand);
76            } else if (eventType.equals(Protocol.E_LEAVE)) {
77                // Leave
78                doLeave(aCommand);
79            } else if (eventType.equals(Protocol.E_HEARTBEAT)) {
80                // Heartbeat mainly to do away with browser "busy" cursor
81                doHeartbeat(aCommand);
82            } else if (eventType.equals(Protocol.E_PUBLISH)) {
83                // Publish event
84                doPublish(aCommand);
85            } else if (eventType.equals(Protocol.E_LISTEN)) {
86                // Listen to pushed events
87                doListen(aCommand);
88            }
89
90            // Handle response back to client
91            if (eventType.endsWith(Protocol.E_LISTEN) ||
92                    eventType.equals(Protocol.E_REFRESH)) {
93                // Data channel events
94                // Loops until refresh or connection closed
95                getSubscriber().fetchEvents(aCommand);
96
97            } else {
98                // Send response for control commands
99                sendControlResponse(aCommand);
00            }
01
02        } catch (Throwable t) {
03            warn("Exception in doCommand(): " + t);
04            t.printStackTrace();
05        }
06    }
07
08    public String toString() {
09        return session.toString();
10    }
11
12    /**
13     * Handle heartbeat event.
14     */
15    protected void doHeartbeat(Command aCommand) {
16
17        // Set heartbeat acknowledgement to client
18        aCommand.setResponseEvent(new Event(E_HEARTBEAT_ACK));
19    }
20
21    /**
22     * Handle Join request.
23     */
24    protected void doJoin(Command aCommand) throws PushletException {
25
26        Event responseEvent = null;
27
28        try {
29
30            session.start();
31
32            // Determine format for encoding Events to client.
33            // Default assume a userAgent window on the other end.
34            String format = aCommand.reqEvent.getField(P_FORMAT, FORMAT_JAVASCRIPT);
35
36            session.setFormat(format);
37            responseEvent = new Event(E_JOIN_ACK);
38
39            // Set unique subscriber id and encoding format
40            responseEvent.setField(P_ID, session.getId());
41            responseEvent.setField(P_FORMAT, format);
42            info("joined");
43        } catch (Throwable t) {
44            session.stop();
45            responseEvent = new Event(E_NACK);
46            responseEvent.setField(P_ID, session.getId());
47            responseEvent.setField(P_REASON, "unexpected error: " + t);
48            warn("doJoin() error: " + t);
49            t.printStackTrace();
50        } finally {
51            // Always set response event in command
52            aCommand.setResponseEvent(responseEvent);
53        }
54
55    }
56
57    /**
58     * Handle JoinListen request.
59     */
60    protected void doJoinListen(Command aCommand) throws PushletException {
61
62        // Basically bundles a join and a listen
63        // This request is handly for simple apps that
64        // need to do a single request to get events immediately
65        // For example in RESTful apps.
66
67        // First do regular join
68        doJoin(aCommand);
69        if (!aCommand.getResponseEvent().getEventType().equals(E_NACK)) {
70            // If successful do the listen
71            doListen(aCommand);
72            if (!aCommand.getResponseEvent().getEventType().equals(E_NACK)) {
73                // If still ok do the listen ack
74                aCommand.getResponseEvent().setField(P_EVENT, E_JOIN_LISTEN_ACK);
75            }
76        }
77    }
78
79    /**
80     * Handle Leave request.
81     */
82    protected void doLeave(Command aCommand) throws IOException {
83
84        Event responseEvent = null;
85
86        try {
87            // Also removes all subscriptions
88            session.stop();
89
90            // Prepare acknowledgement
91            responseEvent = new Event(E_LEAVE_ACK);
92
93            // Set unique subscriber id
94            responseEvent.setField(P_ID, session.getId());
95            info("left");
96        } catch (Throwable t) {
97            responseEvent = new Event(E_NACK);
98            responseEvent.setField(P_ID, session.getId());
99            responseEvent.setField(P_REASON, "unexpected error: " + t);
00            warn("doLeave() error: " + t);
01            t.printStackTrace();
02        } finally {
03            // Always set response event in command
04            aCommand.setResponseEvent(responseEvent);
05        }
06
07    }
08
09    /**
10     * Handle Listen request.
11     */
12    protected void doListen(Command aCommand) throws PushletException {
13
14
15        String mode = MODE_STREAM;
16        // Should we always force "pull" mode ?
17        if (Config.getBoolProperty(LISTEN_FORCE_PULL_ALL)) {
18            mode = MODE_PULL;
19        } else {
20            // Determine optimal mode determined by parameter and/or user agent
21            // Mode param determines how events are transfered to the client
22
23            // In "stream" mode, a stream of events is sent, i.e. the document
24            // is neverending. In "pull" or "poll" mode a complete document is returned
25            // ending with a request to refresh.
26            mode = aCommand.reqEvent.getField(P_MODE, MODE_STREAM);
27
28            String userAgent = aCommand.httpReq.getHeader("User-Agent");
29            if (userAgent != null) {
30                userAgent = userAgent.toLowerCase();
31                for (int i = 0; i < session.FORCED_PULL_AGENTS.length; i++) {
32                    if ((userAgent.indexOf(session.FORCED_PULL_AGENTS[i]) != -1)) {
33                        info("Forcing pull mode for agent=" + userAgent);
34                        mode = MODE_PULL;
35                        break;
36                    }
37                }
38            } else {
39                userAgent = "unknown";
40            }
41        }
42
43        getSubscriber().setMode(mode);
44
45        // Prepare acknowledgement
46        Event listenAckEvent = new Event(E_LISTEN_ACK);
47
48        // Add subscription(s) if subject(s) specified
49        String subject = aCommand.reqEvent.getField(P_SUBJECT);
50        if (subject != null) {
51            // Optional label for subscription
52            String label = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_LABEL);
53
54            // Add a subscription
55            Subscription subscription = getSubscriber().addSubscription(subject, label);
56
57            // Add subscription id and optional label to listen-ack event
58            listenAckEvent.setField(P_SUBSCRIPTION_ID, subscription.getId());
59            if (label != null) {
60                listenAckEvent.setField(P_SUBSCRIPTION_LABEL, label);
61            }
62        }
63
64        // Set unique subscriber id, push mode and encoding format
65        listenAckEvent.setField(P_ID, session.getId());
66        listenAckEvent.setField(P_MODE, mode);
67        listenAckEvent.setField(P_FORMAT, session.getFormat());
68
69        // Activate the subscriber
70        getSubscriber().start();
71
72        // Enqueue listen ack event on data channel
73        aCommand.setResponseEvent(listenAckEvent);
74
75        info("Listening mode=" + mode + " userAgent=" + session.getUserAgent());
76
77    }
78
79    /**
80     * Handle Publish request.
81     */
82    protected void doPublish(Command aCommand) {
83        Event responseEvent = null;
84
85        try {
86            String subject = aCommand.reqEvent.getField(Protocol.P_SUBJECT);
87            if (subject == null) {
88                // Return error response
89                responseEvent = new Event(E_NACK);
90                responseEvent.setField(P_ID, session.getId());
91                responseEvent.setField(P_REASON, "no subject provided");
92            } else {
93                aCommand.reqEvent.setField(P_FROM, session.getId());
94                aCommand.reqEvent.setField(P_EVENT, E_DATA);
95
96                // Event may be targeted to specific user (p_to field)
97                String to = aCommand.reqEvent.getField(P_TO);
98                if (to != null) {
99                    Dispatcher.getInstance().unicast(aCommand.reqEvent, to);
00                } else {
01                    // No to: multicast
02                    debug("doPublish() event=" + aCommand.reqEvent);
03                    Dispatcher.getInstance().multicast(aCommand.reqEvent);
04                }
05
06                // Acknowledge
07                responseEvent = new Event(E_PUBLISH_ACK);
08            }
09
10        } catch (Throwable t) {
11            responseEvent = new Event(E_NACK);
12            responseEvent.setField(P_ID, session.getId());
13            responseEvent.setField(P_REASON, "unexpected error: " + t);
14            warn("doPublish() error: " + t);
15            t.printStackTrace();
16        } finally {
17            // Always set response event in command
18            aCommand.setResponseEvent(responseEvent);
19        }
20    }
21
22    /**
23     * Handle refresh event.
24     */
25    protected void doRefresh(Command aCommand) {
26        // Set ack
27        aCommand.setResponseEvent(new Event(E_REFRESH_ACK));
28    }
29
30    /**
31     * Handle Subscribe request.
32     */
33    protected void doSubscribe(Command aCommand) throws IOException {
34
35        Event responseEvent = null;
36        try {
37            String subject = aCommand.reqEvent.getField(Protocol.P_SUBJECT);
38            Subscription subscription = null;
39            if (subject == null) {
40                // Return error response
41                responseEvent = new Event(E_NACK);
42                responseEvent.setField(P_ID, session.getId());
43                responseEvent.setField(P_REASON, "no subject provided");
44            } else {
45
46                String label = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_LABEL);
47                subscription = getSubscriber().addSubscription(subject, label);
48
49                // Acknowledge
50                responseEvent = new Event(E_SUBSCRIBE_ACK);
51                responseEvent.setField(P_ID, session.getId());
52                responseEvent.setField(P_SUBJECT, subject);
53                responseEvent.setField(P_SUBSCRIPTION_ID, subscription.getId());
54                if (label != null) {
55                    responseEvent.setField(P_SUBSCRIPTION_LABEL, label);
56                }
57                info("subscribed to " + subject + " sid=" + subscription.getId());
58            }
59
60        } catch (Throwable t) {
61            responseEvent = new Event(E_NACK);
62            responseEvent.setField(P_ID, session.getId());
63            responseEvent.setField(P_REASON, "unexpected error: " + t);
64            warn("doSubscribe() error: " + t);
65            t.printStackTrace();
66        } finally {
67            // Always set response event in command
68            aCommand.setResponseEvent(responseEvent);
69        }
70    }
71
72    /**
73     * Handle Unsubscribe request.
74     */
75    protected void doUnsubscribe(Command aCommand) throws IOException {
76
77
78        Event responseEvent = null;
79        try {
80            String subscriptionId = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_ID);
81            if (subscriptionId == null) {
82                // Unsuscbribe all
83                getSubscriber().removeSubscriptions();
84                responseEvent = new Event(E_UNSUBSCRIBE_ACK);
85                responseEvent.setField(P_ID, session.getId());
86                info("unsubscribed all");
87            } else {
88                // Subscription id provided: remove Subscription
89                Subscription subscription = getSubscriber().removeSubscription(subscriptionId);
90                if (subscription == null) {
91                    // Unknown subscription id: return error response
92                    responseEvent = new Event(E_NACK);
93                    responseEvent.setField(P_ID, session.getId());
94                    responseEvent.setField(P_REASON, "no subscription for sid=" + subscriptionId);
95                    warn("unsubscribe: no subscription for sid=" + subscriptionId);
96                } else {
97                    // OK return ack
98                    responseEvent = new Event(E_UNSUBSCRIBE_ACK);
99                    responseEvent.setField(P_ID, session.getId());
00                    responseEvent.setField(P_SUBSCRIPTION_ID, subscription.getId());
01                    responseEvent.setField(P_SUBJECT, subscription.getSubject());
02                    if (subscription.getLabel() != null) {
03                        responseEvent.setField(P_SUBSCRIPTION_LABEL, subscription.getLabel());
04                    }
05                    info("unsubscribed sid= " + subscriptionId);
06                }
07            }
08        } catch (Throwable t) {
09            responseEvent = new Event(E_NACK);
10            responseEvent.setField(P_ID, session.getId());
11            responseEvent.setField(P_REASON, "unexpected error: " + t);
12            warn("doUnsubscribe() error: " + t);
13            t.printStackTrace();
14        } finally {
15            // Always set response event in command
16            aCommand.setResponseEvent(responseEvent);
17        }
18    }
19
20    public Subscriber getSubscriber() {
21        return session.getSubscriber();
22    }
23
24    /**
25     * Send response on the control channel.
26     */
27    protected void sendControlResponse(Command aCommand) {
28        try {
29
30            // Try to prevent caching in any form.
31            aCommand.sendResponseHeaders();
32
33            // Let clientAdapter determine how to send event
34            aCommand.getClientAdapter().start();
35
36            // Push to client through client adapter
37            aCommand.getClientAdapter().push(aCommand.getResponseEvent());
38
39            // One shot response
40            aCommand.getClientAdapter().stop();
41        } catch (Throwable t) {
42            session.stop();
43        }
44    }
45
46
47    /**
48     * Info.
49     */
50    protected void info(String s) {
51        session.info("[Controller] " + s);
52    }
53
54    /**
55     * Exceptional print util.
56     */
57    protected void warn(String s) {
58        session.warn("[Controller] " + s);
59    }
60
61    /**
62     * Exceptional print util.
63     */
64    protected void debug(String s) {
65        session.debug("[Controller] " + s);
66    }
67
68
69}
70
71/*
72 * $Log: Controller.java,v $
73 * Revision 1.9  2007/11/23 14:33:07  justb
74 * core classes now configurable through factory
75 *
76 * Revision 1.8  2005/02/28 15:58:05  justb
77 * added SimpleListener example
78 *
79 * Revision 1.7  2005/02/28 13:05:59  justb
80 * introduced join-listen protocol service
81 *
82 * Revision 1.6  2005/02/28 12:45:59  justb
83 * introduced Command class
84 *
85 * Revision 1.5  2005/02/28 09:14:55  justb
86 * sessmgr/dispatcher factory/singleton support
87 *
88 * Revision 1.4  2005/02/25 15:13:00  justb
89 * session id generation more robust
90 *
91 * Revision 1.3  2005/02/21 16:59:06  justb
92 * SessionManager and session lease introduced
93 *
94 * Revision 1.2  2005/02/21 12:32:28  justb
95 * fixed publish event in Controller
96 *
97 * Revision 1.1  2005/02/21 11:50:46  justb
98 * ohase1 of refactoring Subscriber into Session/Controller/Subscriber
99 *
00
01 *
02 */
03