1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.apache.commons.httpclient;
33
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.OutputStream;
37 import java.lang.ref.Reference;
38 import java.lang.ref.ReferenceQueue;
39 import java.lang.ref.WeakReference;
40 import java.net.InetAddress;
41 import java.net.SocketException;
42 import java.util.ArrayList;
43 import java.util.HashMap;
44 import java.util.Iterator;
45 import java.util.LinkedList;
46 import java.util.Map;
47 import java.util.WeakHashMap;
48
49 import org.apache.commons.httpclient.protocol.Protocol;
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52
53 /***
54 * Manages a set of HttpConnections for various HostConfigurations.
55 *
56 * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
57 * @author Eric Johnson
58 * @author <a href="mailto:mbowler@GargoyleSoftware.com">Mike Bowler</a>
59 * @author Carl A. Dunham
60 *
61 * @since 2.0
62 */
63 public class MultiThreadedHttpConnectionManager implements HttpConnectionManager {
64
65
66 /*** Log object for this class. */
67 private static final Log LOG = LogFactory.getLog(MultiThreadedHttpConnectionManager.class);
68
69 /*** The default maximum number of connections allowed per host */
70 public static final int DEFAULT_MAX_HOST_CONNECTIONS = 2;
71
72 /*** The default maximum number of connections allowed overall */
73 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 20;
74
75 /***
76 * A mapping from Reference to ConnectionSource. Used to reclaim resources when connections
77 * are lost to the garbage collector.
78 */
79 private static final Map REFERENCE_TO_CONNECTION_SOURCE = new HashMap();
80
81 /***
82 * The reference queue used to track when HttpConnections are lost to the
83 * garbage collector
84 */
85 private static final ReferenceQueue REFERENCE_QUEUE = new ReferenceQueue();
86
87 /***
88 * The thread responsible for handling lost connections.
89 */
90 private static ReferenceQueueThread REFERENCE_QUEUE_THREAD;
91
92 /***
93 * Holds references to all active instances of this class.
94 */
95 private static WeakHashMap ALL_CONNECTION_MANAGERS = new WeakHashMap();
96
97 /***
98 * Shuts down and cleans up resources used by all instances of
99 * MultiThreadedHttpConnectionManager. All static resources are released, all threads are
100 * stopped, and {@link #shutdown()} is called on all live instaces of
101 * MultiThreadedHttpConnectionManager.
102 *
103 * @see #shutdown()
104 */
105 public static void shutdownAll() {
106
107 synchronized (REFERENCE_TO_CONNECTION_SOURCE) {
108
109 synchronized (ALL_CONNECTION_MANAGERS) {
110 Iterator connIter = ALL_CONNECTION_MANAGERS.keySet().iterator();
111 while (connIter.hasNext()) {
112 MultiThreadedHttpConnectionManager connManager =
113 (MultiThreadedHttpConnectionManager) connIter.next();
114 connIter.remove();
115 connManager.shutdown();
116 }
117 }
118
119
120 if (REFERENCE_QUEUE_THREAD != null) {
121 REFERENCE_QUEUE_THREAD.shutdown();
122 REFERENCE_QUEUE_THREAD = null;
123 }
124 REFERENCE_TO_CONNECTION_SOURCE.clear();
125 }
126 }
127
128 /***
129 * Stores the reference to the given connection along with the hostConfig and connection pool.
130 * These values will be used to reclaim resources if the connection is lost to the garbage
131 * collector. This method should be called before a connection is released from the connection
132 * manager.
133 *
134 * <p>A static reference to the connection manager will also be stored. To ensure that
135 * the connection manager can be GCed {@link #removeReferenceToConnection(HttpConnection)}
136 * should be called for all connections that the connection manager is storing a reference
137 * to.</p>
138 *
139 * @param connection the connection to create a reference for
140 * @param hostConfiguration the connection's host config
141 * @param connectionPool the connection pool that created the connection
142 *
143 * @see #removeReferenceToConnection(HttpConnection)
144 */
145 private static void storeReferenceToConnection(
146 HttpConnectionWithReference connection,
147 HostConfiguration hostConfiguration,
148 ConnectionPool connectionPool
149 ) {
150
151 ConnectionSource source = new ConnectionSource();
152 source.connectionPool = connectionPool;
153 source.hostConfiguration = hostConfiguration;
154
155 synchronized (REFERENCE_TO_CONNECTION_SOURCE) {
156
157
158 if (REFERENCE_QUEUE_THREAD == null) {
159 REFERENCE_QUEUE_THREAD = new ReferenceQueueThread();
160 REFERENCE_QUEUE_THREAD.start();
161 }
162
163 REFERENCE_TO_CONNECTION_SOURCE.put(
164 connection.reference,
165 source
166 );
167 }
168 }
169
170 /***
171 * Closes and releases all connections currently checked out of the given connection pool.
172 * @param connectionPool the connection pool to shutdown the connections for
173 */
174 private static void shutdownCheckedOutConnections(ConnectionPool connectionPool) {
175
176
177 ArrayList connectionsToClose = new ArrayList();
178
179 synchronized (REFERENCE_TO_CONNECTION_SOURCE) {
180
181 Iterator referenceIter = REFERENCE_TO_CONNECTION_SOURCE.keySet().iterator();
182 while (referenceIter.hasNext()) {
183 Reference ref = (Reference) referenceIter.next();
184 ConnectionSource source =
185 (ConnectionSource) REFERENCE_TO_CONNECTION_SOURCE.get(ref);
186 if (source.connectionPool == connectionPool) {
187 referenceIter.remove();
188 HttpConnection connection = (HttpConnection) ref.get();
189 if (connection != null) {
190 connectionsToClose.add(connection);
191 }
192 }
193 }
194 }
195
196
197
198 for (Iterator i = connectionsToClose.iterator(); i.hasNext();) {
199 HttpConnection connection = (HttpConnection) i.next();
200 connection.close();
201
202
203 connection.setHttpConnectionManager(null);
204 connection.releaseConnection();
205 }
206 }
207
208 /***
209 * Removes the reference being stored for the given connection. This method should be called
210 * when the connection manager again has a direct reference to the connection.
211 *
212 * @param connection the connection to remove the reference for
213 *
214 * @see #storeReferenceToConnection(HttpConnection, HostConfiguration, ConnectionPool)
215 */
216 private static void removeReferenceToConnection(HttpConnectionWithReference connection) {
217
218 synchronized (REFERENCE_TO_CONNECTION_SOURCE) {
219 REFERENCE_TO_CONNECTION_SOURCE.remove(connection.reference);
220 }
221 }
222
223
224 /*** Maximum number of connections allowed per host */
225 private int maxHostConnections = DEFAULT_MAX_HOST_CONNECTIONS;
226
227 /*** Maximum number of connections allowed overall */
228 private int maxTotalConnections = DEFAULT_MAX_TOTAL_CONNECTIONS;
229
230 /*** The value to set when calling setStaleCheckingEnabled() on each connection */
231 private boolean connectionStaleCheckingEnabled = true;
232
233 private boolean shutdown = false;
234
235 /*** Connection Pool */
236 private ConnectionPool connectionPool;
237
238 /***
239 * No-args constructor
240 */
241 public MultiThreadedHttpConnectionManager() {
242 this.connectionPool = new ConnectionPool();
243 synchronized(ALL_CONNECTION_MANAGERS) {
244 ALL_CONNECTION_MANAGERS.put(this, null);
245 }
246 }
247
248 /***
249 * Shuts down the connection manager and releases all resources. All connections associated
250 * with this class will be closed and released.
251 *
252 * <p>The connection manager can no longer be used once shutdown.
253 *
254 * <p>Calling this method more than once will have no effect.
255 */
256 public synchronized void shutdown() {
257 synchronized (connectionPool) {
258 if (!shutdown) {
259 shutdown = true;
260 connectionPool.shutdown();
261 }
262 }
263 }
264
265 /***
266 * Gets the staleCheckingEnabled value to be set on HttpConnections that are created.
267 *
268 * @return <code>true</code> if stale checking will be enabled on HttpConections
269 *
270 * @see HttpConnection#isStaleCheckingEnabled()
271 */
272 public boolean isConnectionStaleCheckingEnabled() {
273 return connectionStaleCheckingEnabled;
274 }
275
276 /***
277 * Sets the staleCheckingEnabled value to be set on HttpConnections that are created.
278 *
279 * @param connectionStaleCheckingEnabled <code>true</code> if stale checking will be enabled
280 * on HttpConections
281 *
282 * @see HttpConnection#setStaleCheckingEnabled(boolean)
283 */
284 public void setConnectionStaleCheckingEnabled(boolean connectionStaleCheckingEnabled) {
285 this.connectionStaleCheckingEnabled = connectionStaleCheckingEnabled;
286 }
287
288 /***
289 * Sets the maximum number of connections allowed for a given
290 * HostConfiguration. Per RFC 2616 section 8.1.4, this value defaults to 2.
291 *
292 * @param maxHostConnections the number of connections allowed for each
293 * hostConfiguration
294 */
295 public void setMaxConnectionsPerHost(int maxHostConnections) {
296 this.maxHostConnections = maxHostConnections;
297 }
298
299 /***
300 * Gets the maximum number of connections allowed for a given
301 * hostConfiguration.
302 *
303 * @return The maximum number of connections allowed for a given
304 * hostConfiguration.
305 */
306 public int getMaxConnectionsPerHost() {
307 return maxHostConnections;
308 }
309
310 /***
311 * Sets the maximum number of connections allowed in the system.
312 *
313 * @param maxTotalConnections the maximum number of connections allowed
314 */
315 public void setMaxTotalConnections(int maxTotalConnections) {
316 this.maxTotalConnections = maxTotalConnections;
317 }
318
319 /***
320 * Gets the maximum number of connections allowed in the system.
321 *
322 * @return The maximum number of connections allowed
323 */
324 public int getMaxTotalConnections() {
325 return maxTotalConnections;
326 }
327
328 /***
329 * @see HttpConnectionManager#getConnection(HostConfiguration)
330 */
331 public HttpConnection getConnection(HostConfiguration hostConfiguration) {
332
333 while (true) {
334 try {
335 return getConnection(hostConfiguration, 0);
336 } catch (HttpException e) {
337
338
339
340 LOG.debug(
341 "Unexpected exception while waiting for connection",
342 e
343 );
344 };
345 }
346 }
347
348 /***
349 * @see HttpConnectionManager#getConnection(HostConfiguration, long)
350 */
351 public HttpConnection getConnection(HostConfiguration hostConfiguration,
352 long timeout) throws HttpException {
353
354 LOG.trace("enter HttpConnectionManager.getConnection(HostConfiguration, long)");
355
356 if (hostConfiguration == null) {
357 throw new IllegalArgumentException("hostConfiguration is null");
358 }
359
360 if (LOG.isDebugEnabled()) {
361 LOG.debug("HttpConnectionManager.getConnection: config = "
362 + hostConfiguration + ", timeout = " + timeout);
363 }
364
365 final HttpConnection conn = doGetConnection(hostConfiguration, timeout);
366
367
368
369 return new HttpConnectionAdapter(conn);
370 }
371
372 /***
373 * Gets a connection or waits if one is not available. A connection is
374 * available if one exists that is not being used or if fewer than
375 * maxHostConnections have been created in the connectionPool, and fewer
376 * than maxTotalConnections have been created in all connectionPools.
377 *
378 * @param hostConfiguration The host configuration.
379 * @param timeout the number of milliseconds to wait for a connection, 0 to
380 * wait indefinitely
381 *
382 * @return HttpConnection an available connection
383 *
384 * @throws HttpException if a connection does not become available in
385 * 'timeout' milliseconds
386 */
387 private HttpConnection doGetConnection(HostConfiguration hostConfiguration,
388 long timeout) throws HttpException {
389
390 HttpConnection connection = null;
391
392 synchronized (connectionPool) {
393
394
395
396 hostConfiguration = new HostConfiguration(hostConfiguration);
397 HostConnectionPool hostPool = connectionPool.getHostPool(hostConfiguration);
398 WaitingThread waitingThread = null;
399
400 boolean useTimeout = (timeout > 0);
401 long timeToWait = timeout;
402 long startWait = 0;
403 long endWait = 0;
404
405 while (connection == null) {
406
407 if (shutdown) {
408 throw new IllegalStateException("Connection factory has been shutdown.");
409 }
410
411
412
413 if (hostPool.freeConnections.size() > 0) {
414 connection = connectionPool.getFreeConnection(hostConfiguration);
415
416
417
418 } else if ((hostPool.numConnections < maxHostConnections)
419 && (connectionPool.numConnections < maxTotalConnections)) {
420
421 connection = connectionPool.createConnection(hostConfiguration);
422
423
424
425
426 } else if ((hostPool.numConnections < maxHostConnections)
427 && (connectionPool.freeConnections.size() > 0)) {
428
429 connectionPool.deleteLeastUsedConnection();
430 connection = connectionPool.createConnection(hostConfiguration);
431
432
433
434
435 } else {
436
437
438
439 try {
440
441 if (useTimeout && timeToWait <= 0) {
442 throw new HttpException("Timeout waiting for connection");
443 }
444
445 if (LOG.isDebugEnabled()) {
446 LOG.debug("Unable to get a connection, waiting..., hostConfig=" + hostConfiguration);
447 }
448
449 if (waitingThread == null) {
450 waitingThread = new WaitingThread();
451 waitingThread.hostConnectionPool = hostPool;
452 waitingThread.thread = Thread.currentThread();
453 }
454
455 if (useTimeout) {
456 startWait = System.currentTimeMillis();
457 }
458
459 hostPool.waitingThreads.addLast(waitingThread);
460 connectionPool.waitingThreads.addLast(waitingThread);
461 connectionPool.wait(timeToWait);
462
463
464
465 hostPool.waitingThreads.remove(waitingThread);
466 connectionPool.waitingThreads.remove(waitingThread);
467 } catch (InterruptedException e) {
468
469 } finally {
470 if (useTimeout) {
471 endWait = System.currentTimeMillis();
472 timeToWait -= (endWait - startWait);
473 }
474 }
475 }
476 }
477 }
478 return connection;
479 }
480
481 /***
482 * Gets the number of connections in use for this configuration.
483 *
484 * @param hostConfiguration the key that connections are tracked on
485 * @return the number of connections in use
486 */
487 public int getConnectionsInUse(HostConfiguration hostConfiguration) {
488 synchronized (connectionPool) {
489 HostConnectionPool hostPool = connectionPool.getHostPool(hostConfiguration);
490 return hostPool.numConnections;
491 }
492 }
493
494 /***
495 * Gets the total number of connections in use.
496 *
497 * @return the total number of connections in use
498 */
499 public int getConnectionsInUse() {
500 synchronized (connectionPool) {
501 return connectionPool.numConnections;
502 }
503 }
504
505 /***
506 * Make the given HttpConnection available for use by other requests.
507 * If another thread is blocked in getConnection() that could use this
508 * connection, it will be woken up.
509 *
510 * @param conn the HttpConnection to make available.
511 */
512 public void releaseConnection(HttpConnection conn) {
513 LOG.trace("enter HttpConnectionManager.releaseConnection(HttpConnection)");
514
515 if (conn instanceof HttpConnectionAdapter) {
516
517 conn = ((HttpConnectionAdapter) conn).getWrappedConnection();
518 } else {
519
520
521 }
522
523
524 SimpleHttpConnectionManager.finishLastResponse(conn);
525
526 connectionPool.freeConnection(conn);
527 }
528
529 /***
530 * Gets the host configuration for a connection.
531 * @param conn the connection to get the configuration of
532 * @return a new HostConfiguration
533 */
534 private HostConfiguration configurationForConnection(HttpConnection conn) {
535
536 HostConfiguration connectionConfiguration = new HostConfiguration();
537
538 connectionConfiguration.setHost(
539 conn.getHost(),
540 conn.getVirtualHost(),
541 conn.getPort(),
542 conn.getProtocol()
543 );
544 if (conn.getLocalAddress() != null) {
545 connectionConfiguration.setLocalAddress(conn.getLocalAddress());
546 }
547 if (conn.getProxyHost() != null) {
548 connectionConfiguration.setProxy(conn.getProxyHost(), conn.getProxyPort());
549 }
550
551 return connectionConfiguration;
552 }
553
554
555 /***
556 * Global Connection Pool, including per-host pools
557 */
558 private class ConnectionPool {
559
560 /*** The list of free connections */
561 private LinkedList freeConnections = new LinkedList();
562
563 /*** The list of WaitingThreads waiting for a connection */
564 private LinkedList waitingThreads = new LinkedList();
565
566 /***
567 * Map where keys are {@link HostConfiguration}s and values are {@link
568 * HostConnectionPool}s
569 */
570 private final Map mapHosts = new HashMap();
571
572 /*** The number of created connections */
573 private int numConnections = 0;
574
575 /***
576 * Cleans up all connection pool resources.
577 */
578 public synchronized void shutdown() {
579
580
581 Iterator iter = freeConnections.iterator();
582 while (iter.hasNext()) {
583 HttpConnection conn = (HttpConnection) iter.next();
584 iter.remove();
585 conn.close();
586 }
587
588
589 shutdownCheckedOutConnections(this);
590
591
592 iter = waitingThreads.iterator();
593 while (iter.hasNext()) {
594 WaitingThread waiter = (WaitingThread) iter.next();
595 iter.remove();
596 waiter.thread.interrupt();
597 }
598
599
600 mapHosts.clear();
601 }
602
603 /***
604 * Creates a new connection and returns is for use of the calling method.
605 *
606 * @param hostConfiguration the configuration for the connection
607 * @return a new connection or <code>null</code> if none are available
608 */
609 public synchronized HttpConnection createConnection(HostConfiguration hostConfiguration) {
610
611 HttpConnectionWithReference connection = null;
612
613 HostConnectionPool hostPool = getHostPool(hostConfiguration);
614
615 if ((hostPool.numConnections < getMaxConnectionsPerHost())
616 && (numConnections < getMaxTotalConnections())) {
617
618 if (LOG.isDebugEnabled()) {
619 LOG.debug("Allocating new connection, hostConfig=" + hostConfiguration);
620 }
621 connection = new HttpConnectionWithReference(hostConfiguration);
622 connection.setStaleCheckingEnabled(connectionStaleCheckingEnabled);
623 connection.setHttpConnectionManager(MultiThreadedHttpConnectionManager.this);
624 numConnections++;
625 hostPool.numConnections++;
626
627
628
629 storeReferenceToConnection(connection, hostConfiguration, this);
630
631 } else if (LOG.isDebugEnabled()) {
632 if (hostPool.numConnections >= getMaxConnectionsPerHost()) {
633 LOG.debug("No connection allocated, host pool has already reached "
634 + "maxConnectionsPerHost, hostConfig=" + hostConfiguration
635 + ", maxConnectionsPerhost=" + getMaxConnectionsPerHost());
636 } else {
637 LOG.debug("No connection allocated, maxTotalConnections reached, "
638 + "maxTotalConnections=" + getMaxTotalConnections());
639 }
640 }
641
642 return connection;
643 }
644
645 /***
646 * Handles cleaning up for a lost connection with the given config. Decrements any
647 * connection counts and notifies waiting threads, if appropriate.
648 *
649 * @param config the host configuration of the connection that was lost
650 */
651 public synchronized void handleLostConnection(HostConfiguration config) {
652 HostConnectionPool hostPool = getHostPool(config);
653 hostPool.numConnections--;
654
655 numConnections--;
656 notifyWaitingThread(config);
657 }
658
659 /***
660 * Get the pool (list) of connections available for the given hostConfig.
661 *
662 * @param hostConfiguration the configuraton for the connection pool
663 * @return a pool (list) of connections available for the given config
664 */
665 public synchronized HostConnectionPool getHostPool(HostConfiguration hostConfiguration) {
666 LOG.trace("enter HttpConnectionManager.ConnectionPool.getHostPool(HostConfiguration)");
667
668
669 HostConnectionPool listConnections = (HostConnectionPool)
670 mapHosts.get(hostConfiguration);
671 if (listConnections == null) {
672
673 listConnections = new HostConnectionPool();
674 listConnections.hostConfiguration = hostConfiguration;
675 mapHosts.put(hostConfiguration, listConnections);
676 }
677
678 return listConnections;
679 }
680
681 /***
682 * If available, get a free connection for this host
683 *
684 * @param hostConfiguration the configuraton for the connection pool
685 * @return an available connection for the given config
686 */
687 public synchronized HttpConnection getFreeConnection(HostConfiguration hostConfiguration) {
688
689 HttpConnectionWithReference connection = null;
690
691 HostConnectionPool hostPool = getHostPool(hostConfiguration);
692
693 if (hostPool.freeConnections.size() > 0) {
694 connection = (HttpConnectionWithReference) hostPool.freeConnections.removeFirst();
695 freeConnections.remove(connection);
696
697
698 storeReferenceToConnection(connection, hostConfiguration, this);
699 if (LOG.isDebugEnabled()) {
700 LOG.debug("Getting free connection, hostConfig=" + hostConfiguration);
701 }
702 } else if (LOG.isDebugEnabled()) {
703 LOG.debug("There were no free connections to get, hostConfig="
704 + hostConfiguration);
705 }
706 return connection;
707 }
708
709 /***
710 * Close and delete an old, unused connection to make room for a new one.
711 */
712 public synchronized void deleteLeastUsedConnection() {
713
714 HttpConnection connection = (HttpConnection) freeConnections.removeFirst();
715
716 if (connection != null) {
717 HostConfiguration connectionConfiguration = configurationForConnection(connection);
718
719 if (LOG.isDebugEnabled()) {
720 LOG.debug("Reclaiming unused connection, hostConfig="
721 + connectionConfiguration);
722 }
723
724 connection.close();
725
726 HostConnectionPool hostPool = getHostPool(connectionConfiguration);
727
728 hostPool.freeConnections.remove(connection);
729 hostPool.numConnections--;
730 numConnections--;
731 } else if (LOG.isDebugEnabled()) {
732 LOG.debug("Attempted to reclaim an unused connection but there were none.");
733 }
734 }
735
736 /***
737 * Notifies a waiting thread that a connection for the given configuration is
738 * available.
739 * @param configuration the host config to use for notifying
740 * @see #notifyWaitingThread(HostConnectionPool)
741 */
742 public synchronized void notifyWaitingThread(HostConfiguration configuration) {
743 notifyWaitingThread(getHostPool(configuration));
744 }
745
746 /***
747 * Notifies a waiting thread that a connection for the given configuration is
748 * available. This will wake a thread witing in tis hostPool or if there is not
749 * one a thread in the ConnectionPool will be notified.
750 *
751 * @param hostPool the host pool to use for notifying
752 */
753 public synchronized void notifyWaitingThread(HostConnectionPool hostPool) {
754
755
756
757
758 WaitingThread waitingThread = null;
759
760 if (hostPool.waitingThreads.size() > 0) {
761 if (LOG.isDebugEnabled()) {
762 LOG.debug("Notifying thread waiting on host pool, hostConfig="
763 + hostPool.hostConfiguration);
764 }
765 waitingThread = (WaitingThread) hostPool.waitingThreads.removeFirst();
766 waitingThreads.remove(waitingThread);
767 } else if (waitingThreads.size() > 0) {
768 if (LOG.isDebugEnabled()) {
769 LOG.debug("No-one waiting on host pool, notifying next waiting thread.");
770 }
771 waitingThread = (WaitingThread) waitingThreads.removeFirst();
772 waitingThread.hostConnectionPool.waitingThreads.remove(waitingThread);
773 } else if (LOG.isDebugEnabled()) {
774 LOG.debug("Notifying no-one, there are no waiting threads");
775 }
776
777 if (waitingThread != null) {
778 waitingThread.thread.interrupt();
779 }
780 }
781
782 /***
783 * Marks the given connection as free.
784 * @param conn a connection that is no longer being used
785 */
786 public void freeConnection(HttpConnection conn) {
787
788 HostConfiguration connectionConfiguration = configurationForConnection(conn);
789
790 if (LOG.isDebugEnabled()) {
791 LOG.debug("Freeing connection, hostConfig=" + connectionConfiguration);
792 }
793
794 synchronized (this) {
795
796 if (shutdown) {
797
798
799 conn.close();
800 return;
801 }
802
803 HostConnectionPool hostPool = getHostPool(connectionConfiguration);
804
805
806 hostPool.freeConnections.add(conn);
807 if (hostPool.numConnections == 0) {
808
809 LOG.error("Host connection pool not found, hostConfig="
810 + connectionConfiguration);
811 hostPool.numConnections = 1;
812 }
813
814 freeConnections.add(conn);
815
816
817 removeReferenceToConnection((HttpConnectionWithReference) conn);
818 if (numConnections == 0) {
819
820 LOG.error("Host connection pool not found, hostConfig="
821 + connectionConfiguration);
822 numConnections = 1;
823 }
824
825 notifyWaitingThread(hostPool);
826 }
827 }
828 }
829
830 /***
831 * A simple struct-like class to combine the objects needed to release a connection's
832 * resources when claimed by the garbage collector.
833 */
834 private static class ConnectionSource {
835
836 /*** The connection pool that created the connection */
837 public ConnectionPool connectionPool;
838
839 /*** The connection's host configuration */
840 public HostConfiguration hostConfiguration;
841 }
842
843 /***
844 * A simple struct-like class to combine the connection list and the count
845 * of created connections.
846 */
847 private static class HostConnectionPool {
848 /*** The hostConfig this pool is for */
849 public HostConfiguration hostConfiguration;
850
851 /*** The list of free connections */
852 public LinkedList freeConnections = new LinkedList();
853
854 /*** The list of WaitingThreads for this host */
855 public LinkedList waitingThreads = new LinkedList();
856
857 /*** The number of created connections */
858 public int numConnections = 0;
859 }
860
861 /***
862 * A simple struct-like class to combine the waiting thread and the connection
863 * pool it is waiting on.
864 */
865 private static class WaitingThread {
866 /*** The thread that is waiting for a connection */
867 public Thread thread;
868
869 /*** The connection pool the thread is waiting for */
870 public HostConnectionPool hostConnectionPool;
871 }
872
873 /***
874 * A thread for listening for HttpConnections reclaimed by the garbage
875 * collector.
876 */
877 private static class ReferenceQueueThread extends Thread {
878
879 private boolean shutdown = false;
880
881 /***
882 * Create an instance and make this a daemon thread.
883 */
884 public ReferenceQueueThread() {
885 setDaemon(true);
886 setName("MultiThreadedHttpConnectionManager cleanup");
887 }
888
889 public void shutdown() {
890 this.shutdown = true;
891 }
892
893 /***
894 * Handles cleaning up for the given connection reference.
895 *
896 * @param ref the reference to clean up
897 */
898 private void handleReference(Reference ref) {
899
900 ConnectionSource source = null;
901
902 synchronized (REFERENCE_TO_CONNECTION_SOURCE) {
903 source = (ConnectionSource) REFERENCE_TO_CONNECTION_SOURCE.remove(ref);
904 }
905
906
907 if (source != null) {
908 if (LOG.isDebugEnabled()) {
909 LOG.debug(
910 "Connection reclaimed by garbage collector, hostConfig="
911 + source.hostConfiguration);
912 }
913
914 source.connectionPool.handleLostConnection(source.hostConfiguration);
915 }
916 }
917
918 /***
919 * Start execution.
920 */
921 public void run() {
922 while (!shutdown) {
923 try {
924
925
926
927 Reference ref = REFERENCE_QUEUE.remove(1000);
928 if (ref != null) {
929 handleReference(ref);
930 }
931 } catch (InterruptedException e) {
932 LOG.debug("ReferenceQueueThread interrupted", e);
933 }
934 }
935 }
936
937 }
938
939 /***
940 * A connection that keeps a reference to itself.
941 */
942 private static class HttpConnectionWithReference extends HttpConnection {
943
944 public WeakReference reference = new WeakReference(this, REFERENCE_QUEUE);
945
946 /***
947 * @param hostConfiguration
948 */
949 public HttpConnectionWithReference(HostConfiguration hostConfiguration) {
950 super(hostConfiguration);
951 }
952
953 }
954
955 /***
956 * An HttpConnection wrapper that ensures a connection cannot be used
957 * once released.
958 */
959 private static class HttpConnectionAdapter extends HttpConnection {
960
961
962 private HttpConnection wrappedConnection;
963
964 /***
965 * Creates a new HttpConnectionAdapter.
966 * @param connection the connection to be wrapped
967 */
968 public HttpConnectionAdapter(HttpConnection connection) {
969 super(connection.getHost(), connection.getPort(), connection.getProtocol());
970 this.wrappedConnection = connection;
971 }
972
973 /***
974 * Tests if the wrapped connection is still available.
975 * @return boolean
976 */
977 protected boolean hasConnection() {
978 return wrappedConnection != null;
979 }
980
981 /***
982 * @return HttpConnection
983 */
984 HttpConnection getWrappedConnection() {
985 return wrappedConnection;
986 }
987
988 public void close() {
989 if (hasConnection()) {
990 wrappedConnection.close();
991 } else {
992
993 }
994 }
995
996 public InetAddress getLocalAddress() {
997 if (hasConnection()) {
998 return wrappedConnection.getLocalAddress();
999 } else {
1000 return null;
1001 }
1002 }
1003
1004 public boolean isStaleCheckingEnabled() {
1005 if (hasConnection()) {
1006 return wrappedConnection.isStaleCheckingEnabled();
1007 } else {
1008 return false;
1009 }
1010 }
1011
1012 public void setLocalAddress(InetAddress localAddress) {
1013 if (hasConnection()) {
1014 wrappedConnection.setLocalAddress(localAddress);
1015 } else {
1016 throw new IllegalStateException("Connection has been released");
1017 }
1018 }
1019
1020 public void setStaleCheckingEnabled(boolean staleCheckEnabled) {
1021 if (hasConnection()) {
1022 wrappedConnection.setStaleCheckingEnabled(staleCheckEnabled);
1023 } else {
1024 throw new IllegalStateException("Connection has been released");
1025 }
1026 }
1027
1028 public String getHost() {
1029 if (hasConnection()) {
1030 return wrappedConnection.getHost();
1031 } else {
1032 return null;
1033 }
1034 }
1035
1036 public HttpConnectionManager getHttpConnectionManager() {
1037 if (hasConnection()) {
1038 return wrappedConnection.getHttpConnectionManager();
1039 } else {
1040 return null;
1041 }
1042 }
1043
1044 public InputStream getLastResponseInputStream() {
1045 if (hasConnection()) {
1046 return wrappedConnection.getLastResponseInputStream();
1047 } else {
1048 return null;
1049 }
1050 }
1051
1052 public int getPort() {
1053 if (hasConnection()) {
1054 return wrappedConnection.getPort();
1055 } else {
1056 return -1;
1057 }
1058 }
1059
1060 public Protocol getProtocol() {
1061 if (hasConnection()) {
1062 return wrappedConnection.getProtocol();
1063 } else {
1064 return null;
1065 }
1066 }
1067
1068 public String getProxyHost() {
1069 if (hasConnection()) {
1070 return wrappedConnection.getProxyHost();
1071 } else {
1072 return null;
1073 }
1074 }
1075
1076 public int getProxyPort() {
1077 if (hasConnection()) {
1078 return wrappedConnection.getProxyPort();
1079 } else {
1080 return -1;
1081 }
1082 }
1083
1084 public OutputStream getRequestOutputStream()
1085 throws IOException, IllegalStateException {
1086 if (hasConnection()) {
1087 return wrappedConnection.getRequestOutputStream();
1088 } else {
1089 return null;
1090 }
1091 }
1092
1093 public OutputStream getRequestOutputStream(boolean useChunking)
1094 throws IOException, IllegalStateException {
1095 if (hasConnection()) {
1096 return wrappedConnection.getRequestOutputStream(useChunking);
1097 } else {
1098 return null;
1099 }
1100 }
1101
1102 public InputStream getResponseInputStream()
1103 throws IOException, IllegalStateException {
1104 if (hasConnection()) {
1105 return wrappedConnection.getResponseInputStream();
1106 } else {
1107 return null;
1108 }
1109 }
1110
1111 public InputStream getResponseInputStream(HttpMethod method)
1112 throws IOException, IllegalStateException {
1113 if (hasConnection()) {
1114 return wrappedConnection.getResponseInputStream(method);
1115 } else {
1116 return null;
1117 }
1118 }
1119
1120 public boolean isOpen() {
1121 if (hasConnection()) {
1122 return wrappedConnection.isOpen();
1123 } else {
1124 return false;
1125 }
1126 }
1127
1128 public boolean isProxied() {
1129 if (hasConnection()) {
1130 return wrappedConnection.isProxied();
1131 } else {
1132 return false;
1133 }
1134 }
1135
1136 public boolean isResponseAvailable() throws IOException {
1137 if (hasConnection()) {
1138 return wrappedConnection.isResponseAvailable();
1139 } else {
1140 return false;
1141 }
1142 }
1143
1144 public boolean isResponseAvailable(int timeout) throws IOException {
1145 if (hasConnection()) {
1146 return wrappedConnection.isResponseAvailable(timeout);
1147 } else {
1148 return false;
1149 }
1150 }
1151
1152 public boolean isSecure() {
1153 if (hasConnection()) {
1154 return wrappedConnection.isSecure();
1155 } else {
1156 return false;
1157 }
1158 }
1159
1160 public boolean isTransparent() {
1161 if (hasConnection()) {
1162 return wrappedConnection.isTransparent();
1163 } else {
1164 return false;
1165 }
1166 }
1167
1168 public void open() throws IOException {
1169 if (hasConnection()) {
1170 wrappedConnection.open();
1171 } else {
1172 throw new IllegalStateException("Connection has been released");
1173 }
1174 }
1175
1176 public void print(String data)
1177 throws IOException, IllegalStateException, HttpRecoverableException {
1178 if (hasConnection()) {
1179 wrappedConnection.print(data);
1180 } else {
1181 throw new IllegalStateException("Connection has been released");
1182 }
1183 }
1184
1185 public void printLine()
1186 throws IOException, IllegalStateException, HttpRecoverableException {
1187 if (hasConnection()) {
1188 wrappedConnection.printLine();
1189 } else {
1190 throw new IllegalStateException("Connection has been released");
1191 }
1192 }
1193
1194 public void printLine(String data)
1195 throws IOException, IllegalStateException, HttpRecoverableException {
1196 if (hasConnection()) {
1197 wrappedConnection.printLine(data);
1198 } else {
1199 throw new IllegalStateException("Connection has been released");
1200 }
1201 }
1202
1203 public String readLine() throws IOException, IllegalStateException {
1204 if (hasConnection()) {
1205 return wrappedConnection.readLine();
1206 } else {
1207 throw new IllegalStateException("Connection has been released");
1208 }
1209 }
1210
1211 public void releaseConnection() {
1212 if (hasConnection()) {
1213 HttpConnection wrappedConnection = this.wrappedConnection;
1214 this.wrappedConnection = null;
1215 wrappedConnection.releaseConnection();
1216 } else {
1217
1218 }
1219 }
1220
1221 public void setConnectionTimeout(int timeout) {
1222 if (hasConnection()) {
1223 wrappedConnection.setConnectionTimeout(timeout);
1224 } else {
1225
1226 }
1227 }
1228
1229 public void setHost(String host) throws IllegalStateException {
1230 if (hasConnection()) {
1231 wrappedConnection.setHost(host);
1232 } else {
1233
1234 }
1235 }
1236
1237 public void setHttpConnectionManager(HttpConnectionManager httpConnectionManager) {
1238 if (hasConnection()) {
1239 wrappedConnection.setHttpConnectionManager(httpConnectionManager);
1240 } else {
1241
1242 }
1243 }
1244
1245 public void setLastResponseInputStream(InputStream inStream) {
1246 if (hasConnection()) {
1247 wrappedConnection.setLastResponseInputStream(inStream);
1248 } else {
1249
1250 }
1251 }
1252
1253 public void setPort(int port) throws IllegalStateException {
1254 if (hasConnection()) {
1255 wrappedConnection.setPort(port);
1256 } else {
1257
1258 }
1259 }
1260
1261 public void setProtocol(Protocol protocol) {
1262 if (hasConnection()) {
1263 wrappedConnection.setProtocol(protocol);
1264 } else {
1265
1266 }
1267 }
1268
1269 public void setProxyHost(String host) throws IllegalStateException {
1270 if (hasConnection()) {
1271 wrappedConnection.setProxyHost(host);
1272 } else {
1273
1274 }
1275 }
1276
1277 public void setProxyPort(int port) throws IllegalStateException {
1278 if (hasConnection()) {
1279 wrappedConnection.setProxyPort(port);
1280 } else {
1281
1282 }
1283 }
1284
1285 public void setSecure(boolean secure) throws IllegalStateException {
1286 if (hasConnection()) {
1287 wrappedConnection.setSecure(secure);
1288 } else {
1289
1290 }
1291 }
1292
1293 public void setSoTimeout(int timeout)
1294 throws SocketException, IllegalStateException {
1295 if (hasConnection()) {
1296 wrappedConnection.setSoTimeout(timeout);
1297 } else {
1298
1299 }
1300 }
1301
1302 public void shutdownOutput() {
1303 if (hasConnection()) {
1304 wrappedConnection.shutdownOutput();
1305 } else {
1306
1307 }
1308 }
1309
1310 public void tunnelCreated() throws IllegalStateException, IOException {
1311 if (hasConnection()) {
1312 wrappedConnection.tunnelCreated();
1313 } else {
1314
1315 }
1316 }
1317
1318 public void write(byte[] data, int offset, int length)
1319 throws IOException, IllegalStateException, HttpRecoverableException {
1320 if (hasConnection()) {
1321 wrappedConnection.write(data, offset, length);
1322 } else {
1323 throw new IllegalStateException("Connection has been released");
1324 }
1325 }
1326
1327 public void write(byte[] data)
1328 throws IOException, IllegalStateException, HttpRecoverableException {
1329 if (hasConnection()) {
1330 wrappedConnection.write(data);
1331 } else {
1332 throw new IllegalStateException("Connection has been released");
1333 }
1334 }
1335
1336 public void writeLine()
1337 throws IOException, IllegalStateException, HttpRecoverableException {
1338 if (hasConnection()) {
1339 wrappedConnection.writeLine();
1340 } else {
1341 throw new IllegalStateException("Connection has been released");
1342 }
1343 }
1344
1345 public void writeLine(byte[] data)
1346 throws IOException, IllegalStateException, HttpRecoverableException {
1347 if (hasConnection()) {
1348 wrappedConnection.writeLine(data);
1349 } else {
1350 throw new IllegalStateException("Connection has been released");
1351 }
1352 }
1353
1354 public void flushRequestOutputStream() throws IOException {
1355 if (hasConnection()) {
1356 wrappedConnection.flushRequestOutputStream();
1357 } else {
1358 throw new IllegalStateException("Connection has been released");
1359 }
1360 }
1361
1362 public int getSoTimeout() throws SocketException {
1363 if (hasConnection()) {
1364 return wrappedConnection.getSoTimeout();
1365 } else {
1366 throw new IllegalStateException("Connection has been released");
1367 }
1368 }
1369
1370 public String getVirtualHost() {
1371 if (hasConnection()) {
1372 return wrappedConnection.getVirtualHost();
1373 } else {
1374 throw new IllegalStateException("Connection has been released");
1375 }
1376 }
1377
1378 public void setVirtualHost(String host) throws IllegalStateException {
1379 if (hasConnection()) {
1380 wrappedConnection.setVirtualHost(host);
1381 } else {
1382 throw new IllegalStateException("Connection has been released");
1383 }
1384 }
1385
1386 public int getSendBufferSize() throws SocketException {
1387 if (hasConnection()) {
1388 return wrappedConnection.getSendBufferSize();
1389 } else {
1390 throw new IllegalStateException("Connection has been released");
1391 }
1392 }
1393
1394 public void setSendBufferSize(int sendBufferSize) throws SocketException {
1395 if (hasConnection()) {
1396 wrappedConnection.setSendBufferSize(sendBufferSize);
1397 } else {
1398 throw new IllegalStateException("Connection has been released");
1399 }
1400 }
1401
1402 }
1403
1404 }
1405