1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3 
4 package nl.justobjects.pushlet.test;
5 
6 import nl.justobjects.pushlet.client.PushletClient;
7 import nl.justobjects.pushlet.client.PushletClientListener;
8 import nl.justobjects.pushlet.core.Event;
9 import nl.justobjects.pushlet.core.Protocol;
10import nl.justobjects.pushlet.util.PushletException;
11import nl.justobjects.pushlet.util.Rand;
12
13import java.util.HashMap;
14import java.util.Map;
15
16/**
17 * Tester to demonstrate Pushlet use in Java applications.
18 * <p/>
19 * This program does two things:
20 * (1) it subscribes to the subject "test/ping"
21 * (2) it publishes an Event with subject "/test/ping" every few seconds.
22 *
23 * @author Just van den Broecke - Just Objects &copy;
24 * @version $Id: StressTester.java,v 1.2 2007/11/09 13:16:57 justb Exp $
25 */
26public class StressTester implements Protocol {
27    static private String host = "localhost";
28    static private int port = 8080;
29    static private int TESTER_COUNT = 10;
30    private static final String SUBJECT = "/test/ping";
31    private static final long MIN_PUBLISH_INTERVAL_MILLIS = 200;
32    private static final long MAX_PUBLISH_INTERVAL_MILLIS = 1000;
33    private static final long MIN_SUBSCRIBER_INTERVAL_MILLIS = 500;
34    private static final long MAX_SUBSCRIBER_INTERVAL_MILLIS = 1000;
35
36    public StressTester() {
37    }
38
39    public void run() {
40        new EventPublisher().start();
41        new EventSubscriber().start();
42    }
43
44    /**
45     * Generic print.
46     */
47    public void err(String s) {
48        System.out.println("[StressTester] ERROR" + s);
49    }
50
51    /**
52     * Generic print.
53     */
54    public void p(String s) {
55        System.out.println("[StressTester] " + s);
56    }
57
58    private class EventSubscriber extends Thread implements PushletClientListener {
59        private PushletClient pushletClient;
60
61        public void run() {
62            while (true) {
63                // Create and start a Pushlet client; we receive callbacks
64                // through onHeartbeat() and onData().
65                try {
66                    pushletClient = new PushletClient(host, port);
67                    // pushletClient.setDebug(true);
68                    pushletClient.join();
69                    pushletClient.listen(this, Protocol.MODE_STREAM);
70                    //p("listening");
71                    // Test subscribe/unsubscribe
72                    String subscriptionId = pushletClient.subscribe(SUBJECT);
73                    pushletClient.unsubscribe(subscriptionId);
74
75                    // The real subscribe
76                    subscriptionId = pushletClient.subscribe(SUBJECT);
77                    //p("sleeping");
78                    sleepRandom();
79                    //p("leaving");
80                    pushletClient.unsubscribe(subscriptionId);
81                    pushletClient.leave();
82
83                } catch (Throwable t) {
84                    err("Error in EventSubscriber t=" + t);
85                    return;
86                }
87            }
88        }
89
90        /**
91         * Error occurred.
92         */
93        public void onError(String message) {
94            // p(message);
95        }
96
97        /**
98         * Abort event from server.
99         */
00        public void onAbort(Event theEvent) {
01            //p("onAbort received: " + theEvent);
02        }
03
04        /**
05         * Data event from server.
06         */
07        public void onData(Event theEvent) {
08            // Calculate round trip delay
09            long then = Long.parseLong(theEvent.getField("time"));
10            long delay = System.currentTimeMillis() - then;
11            //p("onData: ping #" + theEvent.getField("seqNr") + " in " + delay + " ms");
12        }
13
14        /**
15         * Heartbeat event from server.
16         */
17        public void onHeartbeat(Event theEvent) {
18            //p("onHeartbeat received: " + theEvent);
19        }
20
21        private void sleepRandom() throws InterruptedException {
22            Thread.sleep(Rand.randomLong(MIN_SUBSCRIBER_INTERVAL_MILLIS, MAX_SUBSCRIBER_INTERVAL_MILLIS));
23        }
24    }
25
26    private class EventPublisher extends Thread {
27        private PushletClient pushletClient;
28
29        public void run() {
30            // Create and start a Pushlet client; we receive callbacks
31            // through onHeartbeat() and onData().
32            try {
33                pushletClient = new PushletClient(host, port);
34                pushletClient.join();
35
36                // p("pushletClient started");
37            } catch (PushletException pe) {
38                err("Error in EventPublisher pe=" + pe);
39                return;
40            }
41
42            // Publish an event to the server every N seconds.
43            Map eventData = new HashMap(2);
44            int seqNr = 1;
45            while (true) {
46                try {
47                    // Create event data
48                    eventData.put("seqNr", "" + seqNr++);
49                    eventData.put("time", "" + System.currentTimeMillis());
50
51                    // POST event to pushlet server
52                    pushletClient.publish(SUBJECT, eventData);
53
54                    Thread.sleep(Rand.randomLong(MIN_PUBLISH_INTERVAL_MILLIS, MAX_PUBLISH_INTERVAL_MILLIS));
55                } catch (Exception e) {
56                    p("EventPublisher exception: " + e);
57                    return;
58                }
59            }
60        }
61
62    }
63
64    /**
65     * Main program.
66     */
67    public static void main(String args[]) {
68        if (args.length > 0) {
69            TESTER_COUNT = Integer.parseInt(args[0]);
70        }
71        if (args.length == 3) {
72            host = args[1];
73            port = Integer.parseInt(args[2]);
74        }
75
76        for (int i = 0; i < TESTER_COUNT; i++) {
77            new StressTester().run();
78        }
79
80    }
81}
82
83/*
84 * $Log: StressTester.java,v $
85 * Revision 1.2  2007/11/09 13:16:57  justb
86 * use Rand from util package (and and Rand.java to pushlet client jar
87 *
88 * Revision 1.1  2005/02/28 17:16:58  justb
89 * simple stress tester
90 *
91 *
92 */
93