#include "server.h" void *get_client_addr(struct sockaddr *sa) { if (sa->sa_family == AF_INET) { return &(((struct sockaddr_in *) sa)->sin_addr); } return &(((struct sockaddr_in6 *) sa)->sin6_addr); } int Server::exec() { if (!createSocket()) { return -1; } std::cout << "Server: server created and listening on port " << port << std::endl; while (true) { std::cout << "Server: awaiting connection...\n"; ClientData *data = new ClientData(); // would be deleted in thread after its finishing data->sockd = accept(sock, (struct sockaddr *) &data->client_sockaddr, &data->client_sockaddr_size); std::cout << "Server: got new connection, creating worker thread." << std::endl; clients_threads.emplace_back(std::thread(&Server::handleRequest, this, data)); std::cout << "Server: created worker thread " << clients_threads.back().get_id(); } } bool Server::createSocket() { std::cout << "Server: creating socket..." << std::endl; sock = socket(AF_INET, SOCK_STREAM, 0); int on = 1; setsockopt(sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &on, sizeof(on)); /* initialize the server's sockaddr */ struct sockaddr_in server_sockaddr; memset(&server_sockaddr, 0, sizeof(server_sockaddr)); server_sockaddr.sin_family = AF_INET; server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY); server_sockaddr.sin_port = htons(port); printf("Server: Binding socket %d to sockaddr %p with size %d\n", sock, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr)); int bind_result = bind(sock, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr)); if (bind_result < 0) { std::cout << "Server: Error: socket bind failed!" << std::endl; return false; } listen(sock, max_connections); if (sock < 0) { std::cout << "Server: Error, cannot create socket" << std::endl; return false; } return true; } void Server::handleRequest(ClientData* client_data) { char ip[17]; inet_ntop(AF_INET, get_client_addr((struct sockaddr *)&client_data->client_sockaddr), ip, sizeof(ip)); printf("Worker %u: Established connection with %s beginning work.\n", std::this_thread::get_id(), ip); const int request_buffer_size = 65536; char request[request_buffer_size]; // Receiving client type: reader or writer int bytes_recvd = recv(client_data->sockd, request, request_buffer_size - 1, 0); if (bytes_recvd < 0) { fprintf(stderr, "Worker %u: error recv: %s\n", std::this_thread::get_id(), strerror(errno)); delete client_data; return; } request[bytes_recvd] = '\0'; if (strcmp(request, "reader") == 0) { printf("Worker %u: Client %s is READER.\n", std::this_thread::get_id(), ip); client_readers_lock.lock(); clients_readers.push_back(*client_data); client_readers_lock.unlock(); } else if (strcmp(request, "writer") == 0) { printf("Worker %u: Client %s is WRITER.\n", std::this_thread::get_id(), ip); while (true) { int bytes_recvd = recv(client_data->sockd, request, request_buffer_size - 1, 0); if (bytes_recvd <= 0) { fprintf(stderr, "Worker %u: error recv: %s\n", std::this_thread::get_id(), strerror(errno)); delete client_data; return; } request[bytes_recvd] = '\0'; printf("Worker %u: received message: %s\n", std::this_thread::get_id(), request); client_readers_lock.lock(); for (int i = 0; i < clients_readers.size(); ++i) { send(clients_readers[i].sockd, request, bytes_recvd, 0); } client_readers_lock.unlock(); } } else { printf("Worker %u: Client %s is UNRECOGNIZED. Error!\n", std::this_thread::get_id(), ip); } }