001//@formatter:off
002/*
003 * Abstract Threaded Socket server
004 * Code-Beispiel zum Buch Patterns Kompakt, Verlag Springer Vieweg
005 * Copyright 2014 Karl Eilebrecht
006 * 
007 * Licensed under the Apache License, Version 2.0 (the "License"):
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019//@formatter:on
020package de.calamanari.pk.util;
021
022import java.io.IOException;
023import java.net.ServerSocket;
024import java.net.Socket;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ThreadFactory;
028
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.slf4j.event.Level;
032
033/**
034 * Abstract Threaded Socket server<br>
035 * This is a threaded socket server, it can accept and process an arbitrary number of connections concurrently.<br>
036 * A subclass only has to implement the concrete communication operations, the other stuff is handled by logic in the super classes (TEMPLATE METHOD
037 * pattern).<br>
038 * 
039 * @author <a href="mailto:Karl.Eilebrecht(a/t)calamanari.de">Karl Eilebrecht</a>
040 */
041public abstract class AbstractThreadedSocketServer extends AbstractConsoleServer {
042
043    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractThreadedSocketServer.class);
044
045    /**
046     * port the mock server listens
047     */
048    private volatile int serverPort;
049
050    /**
051     * reference to serverSocket
052     */
053    // Volatile is sufficient as there is no race condition in this scenario
054    @SuppressWarnings("java:S3077")
055    protected volatile ServerSocket serverSocket = null;
056
057    /**
058     * Thread management
059     */
060    private final ExecutorService executorService;
061
062    /**
063     * Creates new mock without starting it yet.
064     * 
065     * @param serverName name of the new server
066     */
067    public AbstractThreadedSocketServer(String serverName) {
068        super(serverName);
069        this.executorService = createExecutorService();
070    }
071
072    /**
073     * This method handles the socket communication (usually the job of the native legacy library).<br>
074     * A call to this method returns immediately while the execution part runs in its own thread. This allows the server (the caller of this method) to proceed
075     * with accepting further requests. <br>
076     * This TEMPLATE METHOD delegates single communication handling to subclasses.
077     * 
078     * @param socket accepted socket
079     */
080    protected void handleSocketCommunicationThreaded(final Socket socket) {
081        executorService.execute(() -> {
082            try {
083                LOGGER.debug("{} Connected to {}", Thread.currentThread().getName(), socket.getInetAddress());
084                handleSocketCommunication(socket);
085            }
086            catch (Exception ex) {
087                LOGGER.error("Error during socket communication.", ex);
088            }
089            finally {
090                CloseUtils.closeResourceCatch(socket);
091            }
092            LOGGER.debug("{} disconnected.", Thread.currentThread().getName());
093        });
094    }
095
096    @Override
097    protected void configureInstance(String[] cmdLineArgs) {
098        int port = getDefaultPort();
099        if (cmdLineArgs != null && cmdLineArgs.length > 0) {
100            try {
101                port = Integer.parseInt(cmdLineArgs[0]);
102            }
103            catch (Exception ex) {
104                LOGGER.warn("Error parsing port='{}', using default={}", cmdLineArgs[0], port, ex);
105            }
106        }
107        this.serverPort = port;
108    }
109
110    @Override
111    protected void prepare() {
112        try {
113            serverSocket = new ServerSocket(serverPort);
114        }
115        catch (IOException | RuntimeException ex) {
116            throw new SocketPreparationException(ex);
117        }
118    }
119
120    @Override
121    protected String createStartupCompletedMessage() {
122        return this.getServerName() + " started - listening on port " + this.serverPort;
123    }
124
125    @Override
126    protected void doRequestProcessing() {
127        while (serverSocket != null) {
128            try {
129                // socket will be closed by the responsible worker thread
130                Socket socket = serverSocket.accept();
131                handleSocketCommunicationThreaded(socket);
132            }
133            catch (Exception ex) {
134                if (serverSocket.isClosed()) {
135                    serverSocket = null;
136                    break;
137                }
138                else {
139                    LOGGER.error("Communication error!", ex);
140                }
141            }
142        }
143    }
144
145    @Override
146    protected void initiateShutdown() {
147        CloseUtils.closeResourceCatch(Level.WARN, serverSocket);
148    }
149
150    @Override
151    protected void cleanUp() {
152        CloseUtils.closeResourceCatch(Level.WARN, serverSocket);
153        executorService.shutdown();
154    }
155
156    /**
157     * Method for handling a single socket communication<br>
158     * Subclasses communicate to the client using the socket's streams.<br>
159     * This method must be implemented thread-safe.
160     * 
161     * @param socket (for communication, will be closed by caller automatically)
162     * @throws SocketCommunicationException delegate handling to caller
163     */
164    protected abstract void handleSocketCommunication(Socket socket) throws SocketCommunicationException;
165
166    /**
167     * Method returning a default port.<br>
168     * Subclasses return here a standard port for this server.
169     * 
170     * @return default port
171     */
172    protected abstract int getDefaultPort();
173
174    /**
175     * This method returns an appropriate ExecutorService for Thread-management. <br>
176     * Subclasses may override this method to gain control about pool size and behavior.
177     * 
178     * @return executor service
179     */
180    protected ExecutorService createExecutorService() {
181        return Executors.newCachedThreadPool(new WorkerThreadFactory());
182    }
183
184    /**
185     * Returns new factory for threads, the default implementation returns daemon threads
186     */
187    protected class WorkerThreadFactory implements ThreadFactory {
188        @Override
189        public Thread newThread(Runnable r) {
190            Thread t = new Thread(r);
191            t.setName("WorkerThread('" + getServerName() + "'):@" + Integer.toHexString(t.hashCode()));
192            t.setDaemon(true);
193            return t;
194        }
195    }
196
197}