001    // Copyright 2004, 2005 The Apache Software Foundation
002    //
003    // Licensed under the Apache License, Version 2.0 (the "License");
004    // you may not use this file except in compliance with the License.
005    // You may obtain a copy of the License at
006    //
007    //     http://www.apache.org/licenses/LICENSE-2.0
008    //
009    // Unless required by applicable law or agreed to in writing, software
010    // distributed under the License is distributed on an "AS IS" BASIS,
011    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
012    // See the License for the specific language governing permissions and
013    // limitations under the License.
014    
015    package org.apache.hivemind.impl.servicemodel;
016    
017    import java.util.ArrayList;
018    import java.util.List;
019    
020    import org.apache.hivemind.ApplicationRuntimeException;
021    import org.apache.hivemind.HiveMind;
022    import org.apache.hivemind.PoolManageable;
023    import org.apache.hivemind.events.RegistryShutdownListener;
024    import org.apache.hivemind.impl.ConstructableServicePoint;
025    import org.apache.hivemind.impl.ProxyUtils;
026    import org.apache.hivemind.internal.Module;
027    import org.apache.hivemind.service.ThreadCleanupListener;
028    import org.apache.hivemind.service.ThreadEventNotifier;
029    
030    /**
031     * Similar to the
032     * {@link org.apache.hivemind.impl.servicemodel.ThreadedServiceModel threaded service model},
033     * except that, once created, services are pooled for later use.
034     * 
035     * @author Howard Lewis Ship
036     */
037    public class PooledServiceModel extends AbstractServiceModelImpl
038    {
039        /**
040         * Name of a method in the deferred proxy that is used to obtain the constructed service.
041         */
042        protected static final String SERVICE_ACCESSOR_METHOD_NAME = "_service";
043    
044        private final Object _serviceProxy;
045    
046        private final ThreadEventNotifier _notifier;
047    
048        private final ThreadLocal _activeService = new ThreadLocal();
049    
050        private final List _servicePool = new ArrayList();
051    
052        /** @since 1.1 */
053    
054        private Class _serviceInterface;
055    
056        /**
057         * Shared, null implementation of PoolManageable.
058         */
059        private static final PoolManageable NULL_MANAGEABLE = new PoolManageable()
060        {
061            public void activateService()
062            {
063            }
064    
065            public void passivateService()
066            {
067            }
068        };
069    
070        private class PooledService implements ThreadCleanupListener
071        {
072            private Object _core;
073    
074            private PoolManageable _managed;
075    
076            /**
077             * @param core
078             *            the core service implementation, which may optionally implement
079             *            {@link PoolManageable}
080             */
081            PooledService(Object core)
082            {
083                _core = core;
084    
085                if (core instanceof PoolManageable)
086                    _managed = (PoolManageable) core;
087                else
088                    _managed = NULL_MANAGEABLE;
089            }
090    
091            public void threadDidCleanup()
092            {
093                unbindPooledServiceFromCurrentThread(this);
094            }
095    
096            void activate()
097            {
098                _managed.activateService();
099            }
100    
101            void passivate()
102            {
103                _managed.passivateService();
104            }
105    
106            /**
107             * Returns the configured service implementation.
108             */
109            public Object getService()
110            {
111                return _core;
112            }
113    
114        }
115    
116        public PooledServiceModel(ConstructableServicePoint servicePoint)
117        {
118            super(servicePoint);
119    
120            _serviceInterface = servicePoint.getServiceInterface();
121    
122            Module module = getServicePoint().getModule();
123    
124            _notifier = (ThreadEventNotifier) module.getService(
125                    HiveMind.THREAD_EVENT_NOTIFIER_SERVICE,
126                    ThreadEventNotifier.class);
127    
128            _serviceProxy = constructServiceProxy();
129        }
130    
131        public Object getService()
132        {
133            return _serviceProxy;
134        }
135    
136        /**
137         * Constructs the service proxy and returns it, wrapped in any interceptors.
138         */
139        private Object constructServiceProxy()
140        {
141            ConstructableServicePoint servicePoint = getServicePoint();
142    
143            if (_log.isDebugEnabled())
144                _log.debug("Creating PooledProxy for service " + servicePoint.getExtensionPointId());
145    
146            Object proxy = ProxyUtils.createDelegatingProxy(
147                    "PooledProxy",
148                    this,
149                    "getServiceImplementationForCurrentThread",
150                    servicePoint);
151    
152            Object intercepted = addInterceptors(proxy);
153    
154            RegistryShutdownListener outerProxy = ProxyUtils
155                    .createOuterProxy(intercepted, servicePoint);
156    
157            servicePoint.addRegistryShutdownListener(outerProxy);
158    
159            return outerProxy;
160        }
161    
162        public Object getServiceImplementationForCurrentThread()
163        {
164            PooledService pooled = (PooledService) _activeService.get();
165    
166            if (pooled == null)
167            {
168                pooled = obtainPooledService();
169    
170                pooled.activate();
171    
172                _notifier.addThreadCleanupListener(pooled);
173                _activeService.set(pooled);
174            }
175    
176            return pooled.getService();
177        }
178    
179        private PooledService obtainPooledService()
180        {
181            PooledService result = getServiceFromPool();
182    
183            if (result == null)
184                result = constructPooledService();
185    
186            return result;
187        }
188    
189        private synchronized PooledService getServiceFromPool()
190        {
191            int count = _servicePool.size();
192    
193            if (count == 0)
194                return null;
195    
196            return (PooledService) _servicePool.remove(count - 1);
197        }
198    
199        private synchronized void returnServiceToPool(PooledService pooled)
200        {
201            _servicePool.add(pooled);
202        }
203    
204        private PooledService constructPooledService()
205        {
206            try
207            {
208                Object core = constructCoreServiceImplementation();
209    
210                // This is related to bean services.
211    
212                if (!_serviceInterface.isInstance(core))
213                    core = constructBridgeProxy(core);
214    
215                registerWithShutdownCoordinator(core);
216    
217                return new PooledService(core);
218            }
219            catch (Exception ex)
220            {
221                throw new ApplicationRuntimeException(ServiceModelMessages.unableToConstructService(
222                        getServicePoint(),
223                        ex), ex);
224            }
225        }
226    
227        private void unbindPooledServiceFromCurrentThread(PooledService pooled)
228        {
229            _activeService.set(null);
230    
231            pooled.passivate();
232    
233            returnServiceToPool(pooled);
234        }
235    
236        /**
237         * Invokes {@link #getServiceImplementationForCurrentThread()} to instantiate an instance of the
238         * service.
239         */
240        public void instantiateService()
241        {
242            getServiceImplementationForCurrentThread();
243        }
244    
245    }