1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3 
4 package nl.justobjects.pushlet.test;
5 
6 import nl.justobjects.pushlet.core.Dispatcher;
7 import nl.justobjects.pushlet.core.Event;
8 import nl.justobjects.pushlet.core.EventSource;
9 import nl.justobjects.pushlet.util.Rand;
10
11import java.io.BufferedReader;
12import java.io.IOException;
13import java.io.InputStream;
14import java.io.InputStreamReader;
15import java.net.URL;
16import java.util.StringTokenizer;
17import java.util.Vector;
18
19/**
20 * Event sources that push events (for testing).
21 *
22 * @author Just van den Broecke - Just Objects &copy;
23 * @version $Id: TestEventPushSources.java,v 1.10 2007/11/09 13:16:57 justb Exp $
24 */
25public class TestEventPushSources {
26
27    static public class AEXStocksEventPushSourceABN {
28        String pageURL = "http://ri2.rois.com/E36msPtnZC0e15CVb4KT97JAGfGSfCcrvv6*FcyZIoNyh/CTIB/RI2APISNAP?RIC=0%23.AEX&FORMAT=XML";
29        // This could be further expanded: getting the Reuters AEX stocks
30        // as XML from ABN with this URL, but we may get into legal problems...
31    }
32
33    /**
34     * Produces events from REAL stocks from the AEX.
35     */
36    static public class AEXStocksEventPushSource implements EventSource, Runnable {
37        /**
38         * Here we get our stocks from.
39         */
40        String pageURL = "http://www.debeurs.nl/koersen/aex.asp";
41        Thread thread = null;
42        volatile boolean active = false;
43
44        // Since Baan has been thrown out...
45        public final static int NR_OF_STOCKS = 24;
46
47        public final static String EMPTY = "wait...";
48        private int restarts = 1;
49
50        class Stock {
51            public String name = EMPTY;
52            public String rate = EMPTY;
53            volatile public boolean modified = false;
54        }
55
56        Vector stocksCache = new Vector(NR_OF_STOCKS);
57
58        public AEXStocksEventPushSource() {
59            for (int i = 0; i < NR_OF_STOCKS; i++) {
60                stocksCache.addElement(new Stock());
61            }
62            // updateCache();
63        }
64
65        /**
66         * Activate the event source.
67         */
68        synchronized public void activate() {
69            e("activating...");
70            // Stop a possibly running thread
71            stopThread();
72
73            // Start new thread and
74            thread = new Thread(this, "AEXStocksPublisher-" + (restarts++));
75            active = true;
76            thread.start();
77            e("activated");
78        }
79
80        /**
81         * Deactivate the event source.
82         */
83        synchronized public void passivate() {
84            e("passivating...");
85            active = false;
86            stopThread();
87
88            // Mark the cache modified so we'll send the contents
89            // on the next activation.
90            for (int i = 0; i < NR_OF_STOCKS; i++) {
91                ((Stock) stocksCache.elementAt(i)).modified = true;
92            }
93
94            e("passivated");
95        }
96
97
98        /**
99         * Deactivate the event source.
00         */
01        synchronized public void stop() {
02        }
03
04        public void run() {
05            // Publish cache content (if any) first.
06            publishStocks();
07
08            int count = 5; // enforce update first
09            while (active) {
10
11                // Only do work if active
12                // Update cache every 10 secs.
13                if (count == 5) {
14                    updateCache();
15                    count = 0;
16                }
17                count++;
18
19                // Do updates for changed stock rates
20                sendUpdates();
21
22                // If we were interrupted just return.
23                if (thread == null || thread.isInterrupted()) {
24                    break;
25                }
26
27                // Sleep 2 secs before sending next updates
28                try {
29                    thread.sleep(2000);
30                } catch (InterruptedException ie) {
31                    break;
32                }
33            }
34
35            // Loop terminated: reset vars
36            thread = null;
37            active = false;
38        }
39
40        private String getStocksLine() {
41            BufferedReader br = null;
42            InputStream is = null;
43            String nextLine = "";
44
45            // Read line from server
46            try {
47                is = new URL(pageURL).openStream();
48                br = new BufferedReader(new InputStreamReader(is));
49                boolean foundLine = false;
50                while (!foundLine) {
51                    nextLine = br.readLine();
52                    if (nextLine == null) {
53                        return "";
54                    }
55                    foundLine = (nextLine.indexOf("details.asp?iid=14053&parent=aex") != -1);
56                }
57            } catch (Exception e) {
58                e("could not open or read URL pageURL=" + pageURL + " ex=" + e);
59                return "";
60            } finally {
61                try {
62                    if (is != null) is.close();
63                } catch (IOException ignore) {
64                }
65            }
66            return nextLine;
67        }
68
69        private void publishStocks() {
70            // Publish only modified stocks from the cache
71            for (int i = 0; i < NR_OF_STOCKS; i++) {
72                Stock nextStock = (Stock) stocksCache.elementAt(i);
73
74                // Publish modified stocks
75                if (nextStock.modified) {
76                    publishStock(i, nextStock.name, nextStock.rate);
77                    nextStock.modified = false;
78                    try {
79                        Thread.sleep(400);
80                    } catch (InterruptedException ie) {
81                        return;
82                    }
83                }
84            }
85        }
86
87        private void publishStock(int index, String name, String rate) {
88            Event event = Event.createDataEvent("/stocks/aex");
89            event.setField("number", index + "");
90            event.setField("name", name);
91            event.setField("rate", rate);
92            p("publish: nr=" + index + " name=" + name + " rate=" + rate);
93            Dispatcher.getInstance().multicast(event);
94        }
95
96        private void sendUpdates() {
97            p("sending updates");
98            // In any case send a random stock value by
99            // making it modified, just to see something moving...
00            int randomIndex = Rand.randomInt(0, NR_OF_STOCKS - 1);
01            Stock randomStock = (Stock) stocksCache.elementAt(randomIndex);
02            randomStock.modified = true;
03
04            publishStocks();
05        }
06
07        private void stopThread() {
08            if (thread != null) {
09                thread.interrupt();
10                thread = null;
11            }
12        }
13
14        private void updateCache() {
15            p("updating Cache");
16
17            // Get the line with all stocks from HTML page
18            String stocksLine = getStocksLine();
19            if ("".equals(stocksLine)) {
20                e("updateCache: stocksLine == null");
21                return;
22            }
23
24            // Parse the stocksline and put in cache.
25            // Beware: this is the messy part!!
26            // We assume that stock/names and rates are located at
27            // regular positions in the line.
28            String delim = "<>";
29            StringTokenizer st = new StringTokenizer(stocksLine, delim);
30            String nextToken = "";
31            int count = 0;
32            String nextStock = "";
33            String nextQuote = "";
34            String currentQuote = null;
35            int index = -1;
36            while (st.hasMoreTokens()) {
37                nextToken = st.nextToken();
38                count++;
39                // The <TD> with the stock name
40                if ((count - 5) % 57 == 0) {
41                    p("c=" + count + " s=" + nextToken);
42                    nextStock = nextToken;
43                }
44
45                // The <TD> with the stock rate
46                if ((count - 10) % 57 == 0) {
47                    nextQuote = nextToken;
48                    index++;
49                    p("c=" + count + " val=" + nextQuote);
50                    Stock currentStock = (Stock) stocksCache.elementAt(index);
51
52                    // Only update new or modified stocks
53                    if (EMPTY.equals(currentStock.rate) || !currentStock.rate.equals(nextQuote)) {
54                        p("modified: " + nextStock);
55                        currentStock.name = nextStock;
56                        currentStock.rate = nextQuote;
57                        currentStock.modified = true;
58                    }
59                }
60            }
61        }
62    }
63
64
65    /**
66     * Util: stderr print method.
67     */
68    public static void e(String s) {
69        System.out.println("AEXStocksEventPushSource: " + s);
70    }
71
72    /**
73     * Util: stdout print method.
74     */
75    public static void p(String s) {
76        // System.out.println(s);
77    }
78
79
80    public static void main(String[] args) {
81        // new TestEventPushSources$AEXStocksEventPushSource();
82    }
83}
84
85/*
86 * $Log: TestEventPushSources.java,v $
87 * Revision 1.10  2007/11/09 13:16:57  justb
88 * use Rand from util package (and and Rand.java to pushlet client jar
89 *
90 * Revision 1.9  2005/02/28 09:14:56  justb
91 * sessmgr/dispatcher factory/singleton support
92 *
93 * Revision 1.8  2005/02/21 16:59:17  justb
94 * SessionManager and session lease introduced
95 *
96 * Revision 1.7  2005/02/18 10:07:23  justb
97 * many renamings of classes (make names compact)
98 *
99 * Revision 1.6  2005/02/18 09:54:15  justb
00 * refactor: rename Publisher Dispatcher and single Subscriber class
01 *
02 * Revision 1.5  2004/09/03 22:35:37  justb
03 * Almost complete rewrite, just checking in now
04 *
05 * Revision 1.4  2004/03/10 15:45:55  justb
06 * many cosmetic changes
07 *
08 * Revision 1.3  2003/08/15 08:37:41  justb
09 * fix/add Copyright+LGPL file headers and footers
10 *
11 * Revision 1.2  2003/05/18 16:15:08  justb
12 * support for XML encoded Events
13 *
14 * Revision 1.1.1.1  2002/09/24 21:02:33  justb
15 * import to sourceforge
16 *
17 * Revision 1.1.1.1  2002/09/20 22:48:20  justb
18 * import to SF
19 *
20 * Revision 1.1.1.1  2002/09/20 14:19:02  justb
21 * first import into SF
22 *
23 * Revision 1.6  2001/02/18 23:45:13  just
24 * fixes for AEX
25 *
26 * Revision 1.5  2000/10/30 14:16:09  just
27 * no message
28 *
29 * Revision 1.4  2000/09/24 21:02:43  just
30 * chnages due to changed webpage in debeurs.nl
31 *
32 * Revision 1.3  2000/08/31 12:49:50  just
33 * added CVS comment tags for log and copyright
34 *
35 *
36 */
37