скорость обработки клиентов в сервере на джава.

Тема в разделе "WASM.NETWORKS", создана пользователем neutronion_old_school, 13 дек 2017.

  1. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    Здесь хочется обсудить наилучшие методики скорости обратоки соединений. Для чего это полезно, ну в первую очередь для создания соц. сетей, где скорость обработки клиентов важная часть. Готов обсудить скоростной сервер, Садко. Можешь показать код, который быстро обрабатывает соединения, обсудим и я покажу полностью свой код. И его обсудим тоже.
     
  2. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    Садко, теперь обсудим скорость обработки. Есть еще одно узкое место это создание объектов во время обработки запросов клиентов. Так как на создание объектов требуется много проц. времени. Так?
     
  3. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    А что если я тебе предложу следущий вариант: Что если объекты уже будут созданы при инициализации сервера, тогда не прийдется их создавать, а значит скорость обработки запросов в сервере сильно ускорится? Это узкое горлышко номер 2.
     
  4. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    Третье узкое горлышко, это синхронизация записи в одну таблицу в базе данных. Это узкое горлышко номер 3.
    Еще одно узкое горлышко это размеры данных которые необходимо передавать, но это думаю уже наверно относится к дизайну самого сервера. Его идеалогии. То есть узкое горлышко номер 4.
     
  5. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    Ничего быстрее accept() и помещения сокета в очередь на обработку не придумали, так как это всё требует минимальных вычислительных скоростей.
    Здесь важно не заткнуться по TIMED_WAIT - специальное состояние сокета после того, как его закрыли, подразумевающее сбор в течение нескольких минут пакетов, заблудившихся по сети.
    Также можно сэкономить на handshake, потому что, во-первых, это долгая процедура, а, во-вторых, TCP ввиду своей особенности раскочегаривается несразу, а отправляет сначала за раз один пакет, затем два и т.д. Поэтому если есть возможность поддерживать постоянное соединение между клиентом и сервером, это надо делать.
     
  6. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    Это называется reuse объектов. Вещь, которая позволяет поднять производительность (не надо дёргать хип), но чревата своими проблемами: реюзанные объекты надо контролить, при большой связности с другими объектами это становится делать всё тяжелее и тяжелее.
     
  7. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    SadKo, Я предлагаю такую реализацию и она у меня есть. Да, совершенно верно сначала accept()
    но у тебя это делается в одном потоке(этого не избежать), но обработка тоже осуществляется в одном потоке, а это не гуд.
     
  8. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    Использовать batch-операции вместо однократного вызова SQL-инструкции в БД. Если вызвать batch проблематично, то, возможно, следует перепроектировать систему так, чтобы отдельный поток накапливал batch'и и выполнял их с заданной периодичностью. За счёт этого падает нагрузка на СУБД, падает нагрузка на сеть, и среднее время вставки одной записи понижается в разы.
    Но и надёжность, соответственно, тоже страдает.
     
  9. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    согласен, есть опасность забыть данные предыдущего клиента, и синхронизация нужна будет.
     
  10. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    не надо ни какого батча, щас поясню как работает база данных.
    видишь эту штуку
    c.setAutoCommit(false); Знаешь, что это значит?
    Ты приготавливаешь стейтменты сколько угодно
    stat = c.prepareStatement(sql); Правильно, а в конце после того как ты создал все
    statements своими sql. Ты уже делаешь так
    c.commit(); Это и есть батч, который одним стейтментом посылается. И если что то пойдет не так, то произойдет откат.
    То есть говоря другими словами, к примеру, ты создаешь апдейты многих таблиц приготавливая стейтменты на каждую таблицу, но запрос в базу посылается только при c.commit() и если хоть какая ошибка происходит, происходит откат, так как например postresql журналираемая база.
     
    Последнее редактирование: 13 дек 2017
  11. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    SadKo, Ну что более скоростных реализация соединений с серваком нету у тебя. Если нет, то тогда перейдем к обсуждению сервера к которому я пришел в ходе ресерча длинной в два месяца.
     
  12. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    А вы думаете, я работаю в режиме autoCommit()? Так никакая целостность данных не будет в БД соблюдена.
    Создание Statement - дорогая операция. Лучше один раз создать Statement для пачки строк и запустить один раз batch.
    Если вы делаете statement.execute() или statement.executeUpdate(), то он моментально отправляется на сервак, после чего ваш клиент ждёт ответа об успешности/неуспешности операции.
    То есть, на каждый statement вы тратите время на пересыл по сети, ожидание выполнения и приём ответа. Если сервак висит на локалхосте, это незаметно. Но стоит только через какой-нибудь узкий тоннель подключиться - всё, труба, у вас куча времени улетает на ожидание ввода-вывода.
    Правильнее для вставки пачки данных в одну таблицу делать batch. В этом случае ВСЕ данные batch'а однократно отправляются на сервер, и потом вы получаете один ответ сразу на все строчки.

    Вот пример. Здесь важно выбрать правильно размер batch. Для oracle это порядка 10к записей, для MySQL в районе 4-5к.
    Код (Text):
    1.  
    2.   protected void importData(Connection conn, Tm_BulkMessage message) throws
    3.      SQLException, InterruptedException, Tm_ConversionException
    4.    {
    5.      // Create statement
    6.      long nRowCount = 0;
    7.      PreparedStatement i_stmt = null;
    8.      
    9.      try
    10.      {
    11.        // Produce insert query
    12.        i_stmt = conn.prepareStatement("INSERT INTO ...");
    13.        
    14.        // Batch size
    15.        int batch_size = 0;
    16.        
    17.        // For each record in message
    18.        for (Object fields: message.getMessage())
    19.        {
    20.          // Try to translate
    21.          if (safeTranslate(m_InsertBridge, message, fields, i_stmt))
    22.          {
    23.            i_stmt.addBatch();
    24.            nRowCount++;
    25.            if ((++batch_size)>=m_BatchSize)
    26.            {
    27.              if (m_Logger.isDebugEnabled())
    28.                m_Logger.debug("executing batch next_row=" + nRowCount);
    29.              i_stmt.executeBatch();
    30.              i_stmt.clearBatch();
    31.              batch_size = 0;
    32.            }          
    33.          }
    34.        }
    35.        
    36.        // Execute batch statement if there are instructions in batch
    37.        if (batch_size>0)
    38.        {
    39.          if (m_Logger.isDebugEnabled())
    40.            m_Logger.debug("executing batch next_row=" + nRowCount);
    41.          i_stmt.executeBatch();
    42.        }
    43.      }
    44.      finally
    45.      {
    46.        // Send all traps
    47.        sendQueuedTraps();
    48.        Tm_SQLHelper.safeCloseStatement(i_stmt);
    49.      }
    50.    }
    51.  
     
  13. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    понятно, это ключевая строка
    i_stmt.executeBatch();
    То, есть это посылается на сервер базы, но не выполняется так я понимаю?
    и в конце когда все уже отправлены, посылается команда серверу произвести
    инсерт? Правилно понял? Так покрасивей будет, согласен.
    А что если эти банчи посылают сотни клиентов, не нагрузит ли это сервак? И как тогда быть с синхронизацией?
    И не кажется тебе что postgresql лучше субд, чем мускул?
     
  14. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    И посылается, и выполняется. Только вместо одной записи сразу вставляется m_BatchSize записей. В итоге экономим кучу времени на IO.
    В конце досылается остаток, меньший m_BatchSize, если таковой имеется.
     
  15. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    SadKo, чего то я не понял, выполняется же после этого
    i_stmt.executeBatch();
    иначе зачем тогда эта команда? а все понял, остаток...
     
  16. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    SadKo, Не ну слушай, тогда у тебя этот метод будет висеть довольно долго
    protected void importData
    И если не реализуется отдельный поток для каждого клиента, тогда будет затык.
     
  17. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    Поэтому я и говорю, что можно спроектировать систему так, что она будет копить запросы на вставку от других потоков, и когда подрастёт размер батча/произойдёт таймаут синхронизации - будет выполнять батч на базе. То есть, сам поток клиента в принципе не будет работать с СУБД.
     
  18. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    SadKo, Ладно, спасибо. Завтра я подбробно покажу и расскажу о своем сервака. Вот его и обсудим и покритикуем. На сегодня форума мне уже дастаточно.
     
  19. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    Привет, Садко, как было обещано:
    Даю код, за форматирование и лишние детали прошу прощения, не рефакторил еще
    но в целом я доволен проделанной работой. Однако, после того как изучишь код,
    мне бы хотелось узнать твоего мнения, меня интересуют некоторые детали,
    в которых как мне кажется ты компетентен и сможешь внести полезную лепту.
    В общем изучай, если что-то будет неясно, либо критика добро пожаловать, обсудим.
    Код сервера:
    Код (Text):
    1.  
    2. /*
    3.  * To change this license header, choose License Headers in Project Properties.
    4.  * To change this template file, choose Tools | Templates
    5.  * and open the template in the editor.
    6.  */
    7. package ServakPro;
    8.  
    9. /**
    10.  *
    11.  * @author bb
    12.  */
    13. import Debugger.NetDebugger;
    14. import Debugger.NetDebuggingInter;
    15. import Servak.Db.Experimental.SettingsBandit;
    16. import java.io.*;
    17. import java.net.*;
    18. import java.util.concurrent.ArrayBlockingQueue;
    19. import java.util.concurrent.BlockingQueue;
    20.  
    21. public class Server extends Object implements NetDebuggingInter {
    22.  
    23.   static final int INITIALWORKERTHREADS = 20;// tune this!
    24.   static final int port = SettingsBandit.SERVER_PORT;
    25.   BlockingQueue<Socket> queue;
    26.   private ServerSocket ss;
    27.   private Thread internalThread;
    28.   private volatile boolean noStopRequested;
    29.   private NetDebugger netDebugger = new NetDebugger();
    30.  
    31.   private void init() {
    32.   for (int i = 0; i < INITIALWORKERTHREADS; i++) {
    33.   new Thread(new MultiSessionTask(ss, queue)).start();
    34.   }
    35.   }
    36.  
    37.   public Server(BlockingQueue<Socket> q) throws IOException {
    38.   ss = new ServerSocket(port);
    39.   noStopRequested = true;
    40.   this.queue = q;
    41.   this.init();
    42.  
    43.   Runnable r = new Runnable() {
    44.   public void run() {
    45.   try {
    46.   runWorkFast();
    47.   } catch (Exception x) {
    48.   netDebugger.fireServerFaultEvent("SERVER: there is exception: ", x);
    49.   x.printStackTrace();
    50.   }
    51.   }
    52.   };
    53.   internalThread = new Thread(r);
    54.   internalThread.setName("My server Thread");
    55.   internalThread.start();
    56.   }
    57.  
    58.   private void runWorkFast() {
    59.   for (;;) {
    60.   try {
    61.   Socket sock = ss.accept();
    62.   int thr = MultiSessionTask.waitCount();
    63.   if (thr > 0) {
    64.   queue.add(sock);
    65.   } else {
    66.   queue.add(sock);
    67.   new Thread(new DynamicMultiSessionTask(ss, queue)).start();
    68.   }
    69.   } catch (IOException exc) {
    70.   exc.printStackTrace();
    71.   }
    72.   if (ss.isClosed()) {
    73.   break; // for (;;)
    74.   }
    75.   } // catch    
    76.   }
    77.  
    78.   public synchronized void stopRequest() {
    79.   noStopRequested = false;
    80.   //internalThread.interrupt();
    81.   if (ss != null) {
    82.   try {
    83.   ss.close();
    84.   } catch (IOException x) {
    85.   // ignore
    86.   } finally {
    87.   ss = null;
    88.   }
    89.   }
    90.   }
    91.  
    92.   public boolean isAlive() {
    93.   return internalThread.isAlive();
    94.   }
    95.  
    96.   public static void main(String[] args) {
    97.   try {
    98.   BlockingQueue<Socket> bq = new ArrayBlockingQueue<>(40);
    99.   Server serv = new Server(bq);
    100.   } catch (IOException ex) {
    101.   }
    102.   }
    103.  
    104.   @Override
    105.   public void clientFault(String str, Exception ex) {
    106.   throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    107.   }
    108.  
    109.   @Override
    110.   public void serverFault(String str, Exception ex) {
    111. //   throw new Exception(ex);
    112.   }
    113.  
    114.   @Override
    115.   public void debuggerInfo(String str) {
    116.   throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    117.   }
    118.  
    119.   public void addDebuggerListener(NetDebuggingInter inter) {
    120.   this.netDebugger.addDebuggerListener(inter);
    121.   }
    122.  
    123.   public void removeDebuggerListener(NetDebuggingInter inter) {
    124.   this.netDebugger.removeDebuggerListener(inter);
    125.   }
    126.  
    127. }
    128.  
    129.  
    код MultiSessionTask
    что находится в MultiSessionTask мы обсуждать не будем, но как видишь там
    используется моя реализация паттерна State.
    Код (Text):
    1.  
    2. /*
    3.  * To change this license header, choose License Headers in Project Properties.
    4.  * To change this template file, choose Tools | Templates
    5.  * and open the template in the editor.
    6.  */
    7. package ServakPro;
    8.  
    9. import NetCommands.CONSTANTANIDE;
    10. import NetCommands.ConnectedAck;
    11. import NetCommands.NetworkCommunicator;
    12. import Servak.Factories.ManagerFactory;
    13. import Servak.States.Context;
    14. import Servak.States.FinishedState;
    15. import Servak.States.ServerState;
    16. import java.io.IOException;
    17. import java.io.ObjectInputStream;
    18. import java.io.ObjectOutputStream;
    19. import java.net.ServerSocket;
    20. import java.net.Socket;
    21. import java.net.SocketException;
    22. import java.util.concurrent.BlockingQueue;
    23.  
    24. /**
    25.  *
    26.  * @author bb
    27.  */
    28. public class MultiSessionTask implements SessionTask {
    29.  
    30.   //this will deal with a client
    31.   private ServerSocket ss;
    32.   private BlockingQueue<Socket> queue;
    33.   private static int waitCount = 0;
    34.   private ObjectInputStream dataIn;
    35.   private ObjectOutputStream dataOut;
    36.   private NetworkCommunicator net;
    37.  
    38.   MultiSessionTask(ServerSocket ss,
    39.   BlockingQueue<Socket> queue) {
    40.   this.ss = ss;
    41.   this.queue = queue;
    42.   }
    43.  
    44.   public void run() {
    45.   while (!ss.isClosed() || !queue.isEmpty()) {
    46.   try {
    47.   waitCount++;
    48.   Socket connection = queue.take();
    49.   waitCount--;
    50.   processSession(connection);
    51.   } catch (InterruptedException e) {
    52.   } catch (Exception ex) {
    53.   ex.printStackTrace();
    54.   }
    55.   }
    56.   }
    57.  
    58.   public void processSession(Socket conn) throws SocketException, IOException, Exception {
    59.   //conn.setSoTimeout(CONSTANTANIDE.TIME_TO_READWAIT);
    60.   dataOut = new ObjectOutputStream(conn.getOutputStream());
    61.   dataOut.flush();
    62.   dataIn = new ObjectInputStream(conn.getInputStream());
    63.   net = new NetworkCommunicator(this.dataIn, this.dataOut);
    64.   //this.netDebugger = bugger;
    65.  
    66.   ManagerFactory factory
    67.   = new ManagerFactory(net);
    68.  
    69.   ServerState state = factory.getState(new ConnectedAck());
    70.   Context cont = factory.getContext();
    71.   cont.setState(state);
    72.   while (!(cont.getState() instanceof FinishedState)) {
    73.   cont.getState().doAction();
    74.   }
    75.   }
    76.  
    77.   public static int waitCount() {
    78.   return waitCount;
    79.   }
    80.  
    81. }
    82.  
    Для понимания наверно это тоже будет полезно
    Код (Text):
    1.  
    2. /*
    3.  * To change this license header, choose License Headers in Project Properties.
    4.  * To change this template file, choose Tools | Templates
    5.  * and open the template in the editor.
    6.  */
    7. package ServakPro;
    8.  
    9. import java.io.IOException;
    10. import java.net.Socket;
    11. import java.net.SocketException;
    12.  
    13. /**
    14.  *
    15.  * @author bb
    16.  */
    17. public interface SessionTask extends Runnable {
    18. // inherited from Runnable
    19.  
    20.   public void run();
    21. // process a session
    22.  
    23.   public void processSession(Socket connection) throws SocketException, IOException, Exception;
    24. }
    25.  
    26.  
    класс DynamicMultiSessionTask я показывать не буду это то же самое, что
    и MultiSessionTask.
    Вот и все. Жду комментариев. Сначала желательно пусть выскажется Sadko, он уже более менее
    в теме. Однако, если есть сказать тому, кто специалист и понимает код, добро пожаловать.
     
  20. neutronion_old_school

    neutronion_old_school Попугай Сильвера

    Публикаций:
    0
    Регистрация:
    23 июл 2017
    Сообщения:
    411
    форматирование не сохранилось, увы, не моя вина. Но в Netbeans клавишами ctl+alt+f это легко поправить.