crossplatform.ru

Здравствуйте, гость ( Вход | Регистрация )

2 страниц V   1 2 >  
Ответить в данную темуНачать новую тему
> QtcpSocket. Проблема с асинхронностью чтения данных
Andrewshkovskii
  опции профиля:
сообщение 14.12.2011, 23:33
Сообщение #1


Активный участник
***

Группа: Участник
Сообщений: 351
Регистрация: 27.12.2008
Пользователь №: 467

Спасибо сказали: 18 раз(а)




Репутация:   1  


Друзья, в ниже изложенном алгоритме приема данных из сокета я наблюдаю проблему, которую не знаю как решить, может вы мне поможете?
Условие передачи данных по сокетам - %длина_передаваемых_данных%|%сериализированный_json% .
Слот чтения данных соединен с сигналом readyRead. Обменивается сервер и клиент т.н. "Командами", сериализованный JSON с обязательным атрибутом 'command'.
Приходит команда от сервероного сокета (на сервер сайде используются не qt-сокеты, а питоновские, из модуля socket).
Почему-то, при приеме сравнительно большем объеме данных (100к+ символов) возникает следующая проблема:

Но я не успеваю получить их полностью и в этот поток данных команды вливается, почему-то, ещё один поток данных, из следующей команды. Соответственно, при достижении нужного размера полученных данных я отправляю на обработку не валидный JSON. А происходит это так , допустим есть команда, содержащая 100к+ символов. ОТправляем сокету, и пока сокет её получает, отправляет ещё одну, и ещё одну. и всё это дело почему-то перемешивается. А недостающие куски потом прилетают.
Как это может быть?Разве слоты обрабатываются асинхронного ? (софтина не многопоточная). Да и потом, протокол обеспечивает "правильную" последовательность данных, а тут как-то наоборот. Может я не верно слот с сигналом соединил? Использую следующий код :

# -*- coding: utf-8 -*-
from PyQt4.QtCore import QVariant, pyqtSignal, Qt
from PyQt4.QtNetwork import QTcpSocket
from re import match

MAX_WAIT_LEN  = 8

class UpQSocket(QTcpSocket):
    data_ready = pyqtSignal(unicode)
    def __init__(self):
        QTcpSocket.__init__(self)
        self.wait_len = ''
        self.temp = ''
        self.setSocketOption(QTcpSocket.KeepAliveOption, QVariant(1))
        self.readyRead.connect(self.on_ready_read, Qt.QueuedConnection)

    def connectToHost(self, host, port):
        self.temp = ''
        self.wait_len = ''
        QTcpSocket.connectToHost(self, host, port)

    def close(self):
        QTcpSocket.close(self)

    def send(self, data):
        self.writeData('%s|%s' % (len(data), data))

    def on_ready_read(self):
        if self.bytesAvailable():
            data = str(self.readAll())
            while data:
                if not self.wait_len and '|' in data:#new data and new message
                        self.wait_len , data = data.split('|',1)
                        if match('[0-9]+', self.wait_len) and (len(self.wait_len) <= MAX_WAIT_LEN) and data.startswith('{'):#okay, this is normal length
                            self.wait_len = int(self.wait_len)
                            self.temp = data[:self.wait_len]
                            data = data[self.wait_len:]
                        else:#oh, it was crap
                            self.wait_len , self.temp = '',''
                            return
                elif self.wait_len:#okay, not new message, appending
                    tl= int(self.wait_len)-len(self.temp)
                    self.temp+=data[:tl]
                    data=data[tl:]
                elif not self.wait_len and not '|' in data:#crap
                    return
                if self.wait_len and self.wait_len == len(self.temp):#okay, full message
                        self.data_ready.emit(self.temp)
                        self.wait_len ,self.temp = '',''
                        if not data:
                            return
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
Влад
  опции профиля:
сообщение 15.12.2011, 13:27
Сообщение #2


Участник
**

Группа: Участник
Сообщений: 146
Регистрация: 20.3.2009
Из: Санкт-Петербург
Пользователь №: 627

Спасибо сказали: 46 раз(а)




Репутация:   8  


Гм. Я правильно понял: у тебя И на серверной стороне, И на клиентской - работают гарантированно однопоточные программы?

Я сталкивался с подобным явлением при передаче по TCP соединению больших блоков данных (большинство блоков >64K, хотя некоторые блоки имели размер всего 1-2K) - на серверной стороне работал многопоточный софт, и в этом софте была ошибка, заключавшаяся в том, что никакой синхронизации между потоками при записи данных в один и тот же сокет программистом не было предусмотрено. В результате, при достижении некоторого уровня загрузки приложения, на клиент начинали бессистемно (иногда) приходить "битые" блоки данных, что рушило работу системы. Именно по описанному тобою сценарию - "предыдущие" и "последующие" куски данных перемешивались...

Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
Andrewshkovskii
  опции профиля:
сообщение 15.12.2011, 19:08
Сообщение #3


Активный участник
***

Группа: Участник
Сообщений: 351
Регистрация: 27.12.2008
Пользователь №: 467

Спасибо сказали: 18 раз(а)




Репутация:   1  


Гарантированно однопоточные. Никак не могу понять, почему слот выполняет асинхронно, а не последовательно.
Даже не так - данные не приходят полностью, порядок следования данных нарушается.

Сообщение отредактировал Andrewshkovskii - 15.12.2011, 19:08
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
ssoft
  опции профиля:
сообщение 16.12.2011, 6:42
Сообщение #4


Участник
**

Группа: Участник
Сообщений: 130
Регистрация: 17.2.2010
Из: Москва
Пользователь №: 1470

Спасибо сказали: 30 раз(а)




Репутация:   3  


Могу только пофантазировать пока на тему использования Qt.QueuedConnection, который вызывает слот через очередь а не на прямую, попробуй Qt.DirectConnection. Хотя даже используя Qt.QueuedConnection вызов слота должен быть последовательным.
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
Влад
  опции профиля:
сообщение 16.12.2011, 13:01
Сообщение #5


Участник
**

Группа: Участник
Сообщений: 146
Регистрация: 20.3.2009
Из: Санкт-Петербург
Пользователь №: 627

Спасибо сказали: 46 раз(а)




Репутация:   8  


Я думаю, что дело тут не в сокетах и не в TCP, а в банальной ошибке в алгоритме обработки прочитанного из сокета. Честно говоря, я впервые сталкиваюсь с Питоном (да!), но вот это место я написал бы иначе, чем у тебя:
data += str(self.readAll())


Сообщение отредактировал Влад - 16.12.2011, 13:01
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
Andrewshkovskii
  опции профиля:
сообщение 17.12.2011, 12:19
Сообщение #6


Активный участник
***

Группа: Участник
Сообщений: 351
Регистрация: 27.12.2008
Пользователь №: 467

Спасибо сказали: 18 раз(а)




Репутация:   1  


Если бы была ошибка в алгоритме, то он бы не работал при обычных условиях.приведенная Вами строчка — преобразование bytearray в строку. а как бы сделали Вы?
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
Влад
  опции профиля:
сообщение 19.12.2011, 10:15
Сообщение #7


Участник
**

Группа: Участник
Сообщений: 146
Регистрация: 20.3.2009
Из: Санкт-Петербург
Пользователь №: 627

Спасибо сказали: 46 раз(а)




Репутация:   8  


Ну, вот при обычных условиях - он и не работает :-)) Или ты хочешь сказать, что у тебя условия "необычные"?
Разумеется, я способен разобраться в том, что делает преобразование bytearray в строку. Мое сомнение вызвало то, что, насколько я понимаю, в строке data = str(self.readAll()) затирается старое содержимое data (если там что-то оставалось к моменту чтения!) новыми данными, прочитанными только что из сокета.

Рассмотри такой сценарий работы (use case):

пусть передатчик шлет такие сообщения в соответствии с твоим протоколом (где цифры - валидная длина, а буквы - валидное тело сообщения):
12345|abcdefgh 67890|ijklmnop

на приемной стороне из сокета вынимаются данные и дергается on_ready_read:
12345|ab
cdef
gh67
8
9
0|ijklmnop
Что будет в data после каждого приема данных и будет ли правильно работать твоя функция?

Сообщение отредактировал Влад - 19.12.2011, 12:02
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
PAFOS
  опции профиля:
сообщение 20.12.2011, 8:26
Сообщение #8


Активный участник
***

Группа: Участник
Сообщений: 258
Регистрация: 27.12.2010
Из: Дмитров
Пользователь №: 2309

Спасибо сказали: 29 раз(а)




Репутация:   8  


может быть у переменной есть ограничение на максимальное кол-во символов?
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
Andrewshkovskii
  опции профиля:
сообщение 26.12.2011, 13:53
Сообщение #9


Активный участник
***

Группа: Участник
Сообщений: 351
Регистрация: 27.12.2008
Пользователь №: 467

Спасибо сказали: 18 раз(а)




Репутация:   1  


Извините друзья, я лежал в больнице, не мог ответить,только вот приехал.
Цитата(Влад @ 19.12.2011, 10:15) *
Ну, вот при обычных условиях - он и не работает :-)) Или ты хочешь сказать, что у тебя условия "необычные"?
Разумеется, я способен разобраться в том, что делает преобразование bytearray в строку. Мое сомнение вызвало то, что, насколько я понимаю, в строке data = str(self.readAll()) затирается старое содержимое data (если там что-то оставалось к моменту чтения!) новыми данными, прочитанными только что из сокета.

Рассмотри такой сценарий работы (use case):

пусть передатчик шлет такие сообщения в соответствии с твоим протоколом (где цифры - валидная длина, а буквы - валидное тело сообщения):
12345|abcdefgh 67890|ijklmnop

на приемной стороне из сокета вынимаются данные и дергается on_ready_read:
12345|ab
cdef
gh67
8
9
0|ijklmnop
Что будет в data после каждого приема данных и будет ли правильно работать твоя функция?

Естественно data затирается. только это ЛОКАЛЬНАЯ переменная. и для каждого вызова функции она будет своя,т.е. твое предположение верно только для tmp и wait_len , может "затереться" из-за асинхронного вызова.
Так вот, если есть асинхронность вызова функции, или не верный порядок вызова - то весь алгоритм терпит крах. при иных условиях, исключающих вышесказанные все работает прекрасно.

Цитата(PAFOS @ 20.12.2011, 8:26) *
может быть у переменной есть ограничение на максимальное кол-во символов?

Интересная мысль, но нет, проблема не в ограничении.

Сообщение отредактировал Andrewshkovskii - 26.12.2011, 13:56
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение
Andrewshkovskii
  опции профиля:
сообщение 27.12.2011, 11:23
Сообщение #10


Активный участник
***

Группа: Участник
Сообщений: 351
Регистрация: 27.12.2008
Пользователь №: 467

Спасибо сказали: 18 раз(а)




Репутация:   1  


В общем есть идея отказаться от сигнальной системы для сокетов и попробовать переписать под блокирующие вызовы (waitFor..), сокеты эти должны жить в отдельном треде, только для каждого сокета должен быть свой тред.
Перейти в начало страницы
 
Быстрая цитата+Цитировать сообщение

2 страниц V   1 2 >
Быстрый ответОтветить в данную темуНачать новую тему
Теги
Нет тегов для показа


1 чел. читают эту тему (гостей: 1, скрытых пользователей: 0)
Пользователей: 0




RSS Текстовая версия Сейчас: 27.4.2024, 5:58