Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members

server.cc

00001 /*******************************************************************/
00002 /*  Class BasicTcpServer:-  Generic, one-thread-per-client server. */
00003 /*                                                                 */
00004 /*      This class can only be used as a base class ie it          */
00005 /*  cannot be instantiated on its own.                         */
00006 /*      Class BasicTcpServer takes care of socket connections      */
00007 /*      and threading. Derived classes must implement method       */
00008 /*      doServerOp. This is the operation that the specific        */
00009 /*      server performs repeatedly for each client.                */
00010 /*      <a href="mailto:Simon.Hoyle@csiro.au">Simon Hoyle</a>      */
00011 /*  Original: 15-2-2001                                        */
00012 /*      Revised:  25-8-2004                                        */
00013 /*******************************************************************/     
00014 
00015 #include <iostream>
00016 #include <string>
00017 #include <stdexcept>
00018 #include <unistd.h>
00019 #include <sys/time.h>
00020 #include <sys/stat.h>
00021 #include <fcntl.h>
00022 #include <sys/socket.h>
00023 #include <netinet/tcp.h>
00024 #include <signal.h>
00025 #include <syslog.h>
00026 #include <errno.h>
00027 #include <sched.h>
00028 #include <poll.h>
00029 #include <sys/stropts.h>
00030 
00031 #include "server.h"
00032 #include "error_desc.h"
00033 
00034 using namespace std;
00035 
00036 const int BasicTcpServer::SCHED_POLICY = SCHED_RR;
00037 
00038 bool BasicTcpServer::debugAll = false;
00039 
00040 
00041 BasicTcpServer::BasicTcpServer(int portNum, int maxClients, const char* intro) : 
00042                            port(portNum), mClients(maxClients),
00043                intro(intro), nClients(0)
00044 {
00045     if (debugAll || debug)
00046         cout << "BasicTcpServer:constructor: base port=" << port 
00047          << "  maxClients=" << maxClients << endl;
00048     sprintf(goAwayMsg, "%d client(s) connected, try again later\n", mClients);
00049     int status = pthread_mutex_init(&clientListMutex, NULL);
00050     if (status != 0) 
00051         throw SysLibErrorDesc(status, "BasicTcpServer Constructor", "pthread_mutex_init");
00052     id = "";
00053 }    
00054 
00055 void BasicTcpServer::init()
00056 {
00057     int status;
00058     Arg* arg = new Arg(this);
00059 /*
00060 #if defined (_POSIX_THREAD_PRIORITY_SCHEDULING) && !defined (SOLARIS)  
00061     int min, max, scope;     
00062     status = pthread_attr_init(&threadAttr);
00063     pthread_attr_setscope(&threadAttr, PTHREAD_SCOPE_PROCESS);
00064     if (status != 0) 
00065         cout << "BasicTcpServer::init: pthread_attr_init: " << strerror(status) << endl;
00066     status = pthread_attr_setschedpolicy(&threadAttr, SCHED_POLICY);
00067     if (status != 0) 
00068         cout << "BasicTcpServer::init: pthread_attr_setschedpolicy: " << strerror(status) << endl;
00069     status = pthread_attr_getschedparam(&threadAttr, &threadParam);
00070     if (status != 0) 
00071         cout << "BasicTcpServer::init: pthread_attr_getschedparam: " << strerror(status) << endl;
00072     min = sched_get_priority_min(SCHED_POLICY);
00073     max = sched_get_priority_max(SCHED_POLICY);
00074     //threadParam.sched_priority = (min + max)/2;
00075     ++threadParam.sched_priority;
00076     status = pthread_attr_setschedparam(&threadAttr, &threadParam);
00077     if (status != 0) 
00078         cout << "BasicTcpServer::init: pthread_attr_setschedparam: " << strerror(status) << endl;
00079     status = pthread_attr_setinheritsched(&threadAttr, PTHREAD_EXPLICIT_SCHED); 
00080     if (status != 0) 
00081         cout << "BasicTcpServer::init: pthread_attr_setinheritsched: " << strerror(status) << endl; 
00082     if (debug || debugAll)
00083     {
00084         int threadPolicy;
00085         pthread_attr_getschedpolicy(&threadAttr, &threadPolicy);
00086         pthread_attr_getschedparam(&threadAttr, &threadParam);
00087         pthread_attr_getscope(&threadAttr, &scope);
00088         cout << "BasicTcpServer:" << id << ": starting serverThread:"  
00089         << "  policy=";
00090         switch (threadPolicy) {
00091             case SCHED_RR:    cout << "SCHED_RR";    break;
00092             case SCHED_FIFO:  cout << "SCHED_FIFO";  break;
00093             case SCHED_OTHER: cout << "SCHED_OTHER"; break;
00094             default:          cout << "unknown";
00095         }
00096         cout << "  priority=" << threadParam.sched_priority;
00097         cout << "  contention scope=";
00098         switch (scope) {
00099             case PTHREAD_SCOPE_PROCESS: cout << "PROCESS"; break;
00100             case PTHREAD_SCOPE_SYSTEM:  cout << "SYSTEM";  break;
00101             default:                    cout << "unknown";
00102         }
00103         cout << endl;
00104     }
00105     status = pthread_create(&serverThreadID, &threadAttr, serverThread, (void*)arg);
00106 #else 
00107 */
00108     status = pthread_create(&serverThreadID, NULL, serverThread, (void*)arg);
00109 //#endif
00110     if (status != 0) 
00111         throw SysLibErrorDesc(status, "BasicTcpServer::init", "pthread_create");
00112 /*  Test thread - for debugging only
00113     
00114     status = pthread_create(&serverThreadID, NULL, &testThread, NULL);    
00115     if (status != 0)
00116     {
00117 #ifdef DEBUG
00118        printf("Server::init: Failed to create testThread %d \n ", status);
00119 #endif
00120     return E_SERV_CREAT;
00121     }  
00122 */
00123 
00124 }
00125 
00126 BasicTcpServer::~BasicTcpServer()
00127 {
00128     if (debugAll || debug)
00129         cout << "BasicTcpServer:" << id << ":destructor" << endl;
00130     pthread_cancel(serverThreadID);
00131 }
00132 
00133 void* BasicTcpServer::testThread(void* arg)
00134 {
00135     for (int i=0; i<5; ++i)
00136     {       
00137         cout << "\tIn testThread loop " << i+1 << endl;
00138     sleep(2);
00139     }
00140     return NULL;
00141 }    
00142 
00143 
00144 void BasicTcpServer::blockOn()
00145 {
00146 #ifdef SOLARIS
00147     //select(FD_SETSIZE, NULL, NULL, NULL, NULL);
00148     poll(NULL, 0, INFTIM);
00149 #else
00150     // Since serverThread never returns, this blocks
00151     // the calling thread indefinitely.
00152     pthread_join(serverThreadID, NULL);
00153 #endif
00154 }
00155 
00156 
00157 int BasicTcpServer::readn(int fd, char* buf, int nBytes, int timeOut)
00158 {
00159     int status, nRead;
00160     fd_set readFds;
00161     struct timeval tout;
00162     register int tleft;
00163 
00164     if (debugAll || debug)
00165         cout << "BasicTcpServer:" << id << ":readn: fd=" << fd << " nBytes=" << nBytes
00166         << " timeOut=" << timeOut << endl;
00167     int n = read(fd, buf, nBytes);
00168     if (n < 0) // read error
00169     {
00170     if (errno != EINTR) throw SysLibErrorDesc(errno, "BasicTcpServer::readn", "read");
00171     nRead = 0;
00172     }
00173     else if (n == 0) // connection closed
00174     {
00175         if (debugAll || debug)
00176             cout << "BasicTcpServer:" << id << ":readn: Zero bytes read" << endl;
00177     return 0;
00178     }       
00179     else nRead = n;
00180 
00181     // Minimum timeout is 1 microsec per byte
00182     if (timeOut < nBytes) timeOut = nBytes; 
00183 
00184     while (nRead < nBytes)
00185     {
00186     FD_ZERO(&readFds);
00187         FD_SET(fd, &readFds);
00188         // Calculate remaining time based on number of unread bytes.
00189     // A fudge - should work well in most cases, good enough in
00190     // unusual cases.
00191         tleft = ((nBytes - nRead) * timeOut)/nBytes;
00192         tout.tv_sec = tleft / 1000000;
00193         tout.tv_usec = tleft % 1000000;
00194 
00195     status = select(FD_SETSIZE, &readFds, NULL, NULL, &tout);
00196     if (status < 0) 
00197         throw SysLibErrorDesc(errno, "BasicTcpServer::readn", "select");
00198     if (FD_ISSET(fd, &readFds))
00199     {
00200             n = read(fd, &buf[nRead], nBytes - nRead);
00201         if (n < 0) // read error
00202         {
00203         if (errno == EINTR) continue;
00204                 throw SysLibErrorDesc(errno, "BasicTcpServer::readn", "read"); 
00205             }
00206         if (n == 0) // connection closed
00207             {
00208                 if (debugAll || debug)
00209             cout << "BasicTcpServer:" << id << ":readn: Client has closed connection, " 
00210              << nRead << " bytes read" << endl;
00211         break;
00212         }
00213         nRead += n;
00214     }
00215     else break; // timeout
00216     }   
00217     return nRead;
00218 }
00219 
00220 
00221 int BasicTcpServer::readLine(int fd, char* buf, int maxBytes)
00222 {
00223 //  Reads a line of text from a socket. 
00224 //  Returns number of bytes after a newline
00225 //  is encountered or maxBytes have been read.
00226 //  Returns zero if client closes the connection.
00227     int nRead = 0;
00228 
00229     while (nRead < maxBytes)
00230     {
00231         int res = read(fd, &buf[nRead], 1);
00232     if (res < 0)
00233     {
00234         if (errno == EINTR) continue;
00235             throw SysLibErrorDesc(errno, "BasicTcpServer::readLine", "read");
00236     }
00237     if (res == 0)
00238     {
00239             if (debugAll || debug)
00240             cout << "BasicTcpServer:" << id << ":readLine: Client has closed connection, "
00241              << nRead << " bytes read" << endl;
00242         break;
00243     }
00244     if (buf[nRead++] == '\n') break;
00245     }
00246     return nRead;
00247 }
00248 
00249 
00250 void BasicTcpServer::onConnect(ClientInfo* curr)
00251 {
00252     if (debugAll || debug) 
00253         cout << "BasicTcpServer:" << id << ":Client connected: " 
00254         << "base port=" << port << "  connected port=" << curr->port
00255         << "  ip=" << curr->ip << "  host=" << curr->host << endl;
00256 }
00257 
00258 void BasicTcpServer::onDisconnect(ClientInfo* curr)
00259 {
00260     if (debugAll || debug) 
00261         cout << "BasicTcpServer:" << id << ":Client disconnected: " 
00262         << "base port=" << port << "  ip=" << curr->ip 
00263         << "  host=" << curr->host << endl;     
00264 }
00265 
00266 void* BasicTcpServer::serverThread(void* arg)
00267 {
00268     // Use local pointer to make code more readable
00269     BasicTcpServer* obj = ((Arg*)arg)->obj;
00270     if (debugAll || obj->debug) 
00271         cout << "BasicTcpServer:" << obj->id << ":serverThread started" << endl;
00272     struct sockaddr_in sa;
00273 #ifdef SOLARIS
00274     size_t addrlen;
00275 #else    
00276     socklen_t addrlen;
00277 #endif
00278     pthread_t clientThreadID;
00279     // Must block SIGPIPE. This signal is generated when the server
00280     // tries to write to a closed socket. Mask it here so that all
00281     // ClientHandler threads inherit the mask.
00282     sigset_t sigSet;
00283     sigemptyset(&sigSet);
00284     sigaddset(&sigSet, SIGPIPE);
00285 #ifdef SOLARIS
00286     sigaddset(&sigSet, SIGLWP);
00287 #endif
00288     pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
00289     if (debugAll || obj->debug)
00290         cout << "BasicTcpServer:" << obj->id << ":serverThread detaching" << endl;
00291     pthread_detach(pthread_self());
00292     try {
00293     // Open socket, bind and listen
00294     int listenFd = socket(AF_INET, SOCK_STREAM, 0);
00295     if (listenFd < 0)
00296     { 
00297         perror("BasicTcpServer::serverThread: ");
00298         throw SysLibErrorDesc(errno, "BasicTcpServer::serverThread", "socket");
00299     }
00300     // Set re-use address socket option
00301     char on = 1;
00302     if (setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)) < 0)
00303         throw SysLibErrorDesc(errno, "BasicTcpServer::serverThread", "setsockopt");
00304     if (setsockopt(listenFd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(int)) < 0)
00305         throw SysLibErrorDesc(errno, "BasicTcpServer::serverThread", "setsockopt");
00306     memset(&sa, 0, sizeof(sa));
00307     sa.sin_family = AF_INET;
00308     sa.sin_port = htons(obj->port);
00309     sa.sin_addr.s_addr = htonl(INADDR_ANY);
00310     addrlen = sizeof(struct sockaddr);
00311     if (bind(listenFd, (struct sockaddr*)&sa, addrlen) < 0)
00312         throw SysLibErrorDesc(errno, "BasicTcpServer::serverThread", "bind");
00313     if (listen(listenFd, obj->mClients) < 0) 
00314         throw SysLibErrorDesc(errno, "BasicTcpServer::serverThread", "listen");
00315     while (1)
00316     {
00317         // Block waiting for connections
00318         if (debugAll || obj->debug) 
00319         cout << "BasicTcpServer:" << obj->id << ":serverThread: Waiting for clients to connect ..." 
00320             << endl;
00321 #ifdef SOLARIS
00322         int acceptFd = accept(listenFd, (struct sockaddr*)&sa, (int*)&addrlen);
00323 #else
00324         int acceptFd = accept(listenFd, (struct sockaddr*)&sa, &addrlen);
00325 #endif  
00326         if (acceptFd < 0) continue;
00327     // Check number of client connections, don't accept
00328     // any more if the maximum has been reached.
00329         if (obj->nClients >= obj->mClients)
00330         {
00331             write(acceptFd, obj->goAwayMsg, strlen(obj->goAwayMsg));
00332         close(acceptFd);
00333         continue;
00334     }
00335         ClientInfo* currClient = new ClientInfo(acceptFd, &sa);
00336         // Spawn another client handler thread
00337     Arg* arg = new Arg(obj, currClient);
00338 /*
00339 #if defined (_POSIX_THREAD_PRIORITY_SCHEDULING) && !defined (SOLARIS)
00340     if (debugAll || obj->debug)
00341     {
00342         int threadPolicy;
00343         pthread_attr_getschedpolicy(&obj->threadAttr, &threadPolicy);
00344         pthread_attr_getschedparam(&obj->threadAttr, &obj->threadParam);
00345         cout << "BasicTcpServer:" << obj->id << ": starting clientHandlerThread:"  
00346         << "  policy=";
00347         switch (threadPolicy) {
00348             case SCHED_RR:    cout << "SCHED_RR";    break;
00349             case SCHED_FIFO:  cout << "SCHED_FIFO";  break;
00350             case SCHED_OTHER: cout << "SCHED_OTHER"; break;
00351             default:          cout << "unknown";
00352         }
00353         cout << "  priority=" << obj->threadParam.sched_priority << endl;
00354     }
00355         pthread_create(&clientThreadID, &obj->threadAttr, clientHandlerThread, arg);
00356 #else
00357 */
00358         pthread_create(&clientThreadID, NULL, clientHandlerThread, arg);
00359 //#endif
00360     }
00361     } catch(const std::exception& e) { std::cerr << "Server::serverThread: " << e.what() << std::endl; }
00362 
00363     return NULL;
00364 }
00365 
00366 
00367 void* BasicTcpServer::clientHandlerThread(void* arg)
00368 {
00369     // Use local variables to make code more readable
00370     BasicTcpServer* obj = ((Arg*)arg)->obj;
00371     ClientInfo* ci = ((Arg*)arg)->ci;
00372     int sockFd = ci->sockFd; 
00373     int status;
00374     //int cancelType;
00375 
00376     //pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &cancelType);
00377     pthread_t myID = pthread_self();
00378     pthread_detach(myID);
00379     ci->tid = myID;
00380     try {
00381     status = pthread_mutex_lock(&obj->clientListMutex); 
00382     if (status < 0)
00383         throw SysLibErrorDesc(status, "BasicTcpServer::clientHandlerThread", "pthread_mutex_lock");
00384     ++obj->nClients;
00385     if (debugAll || obj->debug)
00386         cout << "BasicTcpServer:" << obj->id  << ":clientHandlerThread: nClients=" 
00387         << obj->nClients << endl;
00388     // Add current client to list
00389     obj->clientList.push_back(ci);  
00390     status = pthread_mutex_unlock(&obj->clientListMutex);
00391     if (status < 0)
00392         throw SysLibErrorDesc(status, "BasicTcpServer::clientHandlerThread", "pthread_mutex_unlock");      
00393     if (obj->intro != NULL)
00394         write(sockFd, obj->intro, strlen(obj->intro));
00395     obj->onConnect(ci);
00396     // Loop forever executing doServerOp on the client socket connection.
00397     while (1)
00398     {
00399         try {
00400         status = obj->doServerOp(ci);
00401     // NB: A negative return value from doServerOp closes the 
00402     // connection.
00403         if (status < 0) break; 
00404         } catch(const std::exception& e) { std::cerr << e.what() << std::endl; }  
00405     }
00406     if (pthread_mutex_lock(&obj->clientListMutex)) 
00407         throw SysLibErrorDesc(errno, "BasicTcpServer::clientHandlerThread", "pthread_mutex_lock");
00408     close(sockFd);
00409     --obj->nClients;
00410     obj->clientList.remove(ci);
00411     obj->onDisconnect(ci);
00412     if (pthread_mutex_unlock(&obj->clientListMutex)) 
00413         throw SysLibErrorDesc(errno, "BasicTcpServer::clientHandlerThread", "pthread_mutex_unlock");
00414     if (debugAll || obj->debug)
00415         cout << "BasicTcpServer:" << obj->id << ":clientHandlerThread: Client dropped, nClients=" 
00416          << obj->nClients << endl;
00417     delete ci;
00418     } catch(const std::exception& e) { std::cerr << "Server::serverThread: " << e.what() << std::endl; }
00419 
00420     return NULL;        
00421 }
00422 

Generated on Mon Apr 30 13:32:39 2007 for Parkes M & C - PKMC C++ library API by  doxygen 1.4.4