Twisted
Материал взят из книги : Twisted Network Programming Essentials , автор - Abe Fettig
Твистед - кросс-платформенная сетевая библиотека , написанная на питоне , вследствие чего она работает
везде , где есть питон . Это асинхронный фреймворк , который избавляет вас от небходимости использовать потоки.
Я кстати лично в этом убеждался не один раз : если в питоне ты создаешь слушающий сокет и не задумываешься
о том , что он может что-то блокировать , то в си приходится городить много-поточность и много чего еще .
Твистед поддерживает работу с mail, web, news, chat, DNS, SSH, Telnet, RPC, и т.д.
Твистед позволяет сделать многое , вплоть до реализации собственного протокола .
Инсталляция
Многие линуксовые дистрибутивы уже включают в себя твистед , и самый простой вариант его установки -
это поставить его из локального репозитария . В книге рекомендуется делать так :
Исходники твистеда лежат на http://twistedmatrix.com/projects/core/
Потом можно поставить дополнительно две библиотеки : PyOpenSSL и PyCrypto.
После инсталляции нужно проверить , видит ли его питон : запустите интерпретатор питона и наберите
>>> import twisted
Можно проинсталлировать твистед из исходников : нужно будет в архиве исходников найти файл ZopeInterface ,
разархивировать его и запустить вначале инсталляцию zope.interface . После чего уже устанавливается сам твистед.
Клиент-сервер
Твистед - фреймворк , в основе которого лежат события - event . Работой таких событий управляют
специальные функции , называемые event handler . Работой самих этих функций управляет другая функция -
т.н. event loop . Она крутится постоянно , отлавливает события , после чего запускает соответствующие хэндлеры.
После чего кошмар с последовательной обработкой событий заканчивается : ваша программа начинает работать
как бы сама по себе , а все сетевые события как бы остаются за кадром и ничего не тормозят .
За работу event loop в твистеде отвечает обьект , называемый reactor , который лежит в модуле
twisted.internet . Для его запуска нужно вызвать команду :
reactor.run()
У реактора есть метод - callLater , который позволяет делать запуск функций по расписанию , или по таймеру.
Рассмотрим пример :
from twisted.internet import reactor
import time
def printTime( ):
print "Current time is", time.strftime("%H:%M:%S")
def stopReactor( ):
print "Stopping reactor"
reactor.stop( )
reactor.callLater(1, printTime)
reactor.callLater(2, printTime)
reactor.callLater(3, printTime)
reactor.callLater(4, printTime)
reactor.callLater(5, stopReactor)
print "Running the reactor..."
reactor.run( )
print "Reactor stopped."
Ее вывод будет таким :
Running the reactor...
Current time is 10:33:44
Current time is 10:33:45
Current time is 10:33:46
Current time is 10:33:47
Stopping reactor
Reactor stopped.
Перейдем к TCP . Ниже приведен пример , который устанавливает коннект с удаленным сервером .
Коннект создается методом reactor.connectTCP() , у которого три параметра . Третий параметр -
обьект ClientFactory , который создает другой обьект - Protocol - который будет управлять
потоком данных между клиентом и сервером .
from twisted.internet import reactor, protocol
class QuickDisconnectProtocol(protocol.Protocol):
def connectionMade(self):
print "Connected to %s." % self.transport.getPeer( ).host
self.transport.loseConnection( )
class BasicClientFactory(protocol.ClientFactory):
protocol = QuickDisconnectProtocol
def clientConnectionLost(self, connector, reason):
print "Lost connection: %s" % reason.getErrorMessage( )
reactor.stop( )
def clientConnectionFailed(self, connector, reason):
print "Connection failed: %s" % reason.getErrorMessage( )
reactor.stop( )
reactor.connectTCP('www.google.com', 80, BasicClientFactory( ))
reactor.run( )
После запуска этой программы мы должны получить :
Connected to www.google.com.
Lost connection: Connection was closed cleanly.
Есть 2 главных базовых класса - ClientFactory и Protocol. Они обрабатываю все возможные события ,
связанные с коннектом . Для каждого коннекта будет создан свой обьект класса Protocol.
После обьекта reactor вторым по значимости обьектом в твистед возможно является обьект Deferred.
Он позволяет управлять последовательностью асинхронных событий , параллельно выполняя какую-то другую работу .
Рассмотрим это на примере , где будет асинхронная проверка на коннект . Асинхронная функция может вернуть ошибку,
в этом случае для обработки нужно использовать метод Deferred.addErrback
from twisted.internet import reactor, defer, protocol
class CallbackAndDisconnectProtocol(protocol.Protocol):
def connectionMade(self):
self.factory.deferred.callback("Connected!")
self.transport.loseConnection( )
class ConnectionTestFactory(protocol.ClientFactory):
protocol = CallbackAndDisconnectProtocol
def __init__(self):
self.deferred = defer.Deferred( )
def clientConnectionFailed(self, connector, reason):
self.deferred.errback(reason)
def testConnect(host, port):
testFactory = ConnectionTestFactory( )
reactor.connectTCP(host, port, testFactory)
return testFactory.deferred
def handleSuccess(result, port):
print "Connected to port %i" % port
reactor.stop( )
def handleFailure(failure, port):
print "Error connecting to port %i: %s" % (
port, failure.getErrorMessage( ))
reactor.stop( )
if __name__ == "__main__":
import sys
if not len(sys.argv) == 3:
print "Usage: connectiontest.py host port"
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
connecting = testConnect(host, port)
connecting.addCallback(handleSuccess, port)
connecting.addErrback(handleFailure, port)
reactor.run( )
Скрипту нужно передать 2 параметра - имя сервера и порт.
У обьекта ClientFactory есть атрибут Deferred , который позволяет обработать ошибку и повесить
на событие наш собственный метод handleSuccess , либо handleFailure.
У обьекта Deferred есть расширение - deferredList , который позволяет создать группу атрибутов Deferred ,
например , для сканирования на коннект нескольких портов . Следующий скрипт сканирует хост в диапазоне портов 1-200:
from twisted.internet import reactor, defer
from connectiontester import testConnect
def handleAllResults(results, ports):
for port, resultInfo in zip(ports, results):
success, result = resultInfo
if success:
print "Connected to port %i" % port
reactor.stop( )
import sys
host = sys.argv[1]
ports = range(1, 201)
testers = [testConnect(host, port) for port in ports]
defer.DeferredList(testers, consumeErrors=True).addCallback(handleAllResults, ports)
reactor.run( )
После установки коннекта потоками данных управляет обьект Protocol . Метод dataReceived обрабатывает
входящий поток , отсылает данные метод self.transport.write.
Напишем простой эхо-сервер - сначала клиент :
from twisted.internet import stdio, reactor, protocol
from twisted.protocols import basic
import re
class DataForwardingProtocol(protocol.Protocol):
def __init__(self):
self.output = None
self.normalizeNewlines = False
def dataReceived(self, data):
if self.normalizeNewlines:
data = re.sub(r"(\r\n|\n)", "\r\n", data)
if self.output:
self.output.write(data)
class StdioProxyProtocol(DataForwardingProtocol):
def connectionMade(self):
inputForwarder = DataForwardingProtocol( )
inputForwarder.output = self.transport
inputForwarder.normalizeNewlines = True
stdioWrapper = stdio.StandardIO(inputForwarder)
self.output = stdioWrapper
print "Connected to server. Press ctrl-C to close connection."
class StdioProxyFactory(protocol.ClientFactory):
protocol = StdioProxyProtocol
def clientConnectionLost(self, transport, reason):
reactor.stop( )
def clientConnectionFailed(self, transport, reason):
print reason.getErrorMessage( )
reactor.stop( )
if __name__ == '__main__':
import sys
if not len(sys.argv) == 3:
print "Usage: %s host port" % __file__
sys.exit(1)
reactor.connectTCP(sys.argv[1], int(sys.argv[2]), StdioProxyFactory( ))
reactor.run( )
Клиенту нужно передать имя локал-хоста и порт 5001. Теперь сервер :
from twisted.internet import reactor, protocol
from twisted.protocols import basic
class EchoProtocol(basic.LineReceiver):
def lineReceived(self, line):
if line == 'quit':
self.sendLine("Goodbye.")
self.transport.loseConnection( )
else:
self.sendLine("You said: " + line)
class EchoServerFactory(protocol.ServerFactory):
protocol = EchoProtocol
if __name__ == "__main__":
port = 5001
reactor.listenTCP(port, EchoServerFactory( ))
reactor.run( )
После того , как клиент установит коннект с сервером , набираем в клиенте строку , посылаем , и получаем ответ .
Download
Напишем монитор для загрузки файла. twisted.web.client не дает достаточного функционала , поэтому
мы используем client.HTTPDownloader для создания собственного загрузчика , который будет грузить
веб-страницу .
from twisted.web import client
class HTTPProgressDownloader(client.HTTPDownloader):
def gotHeaders(self, headers):
if self.status == '200': # page data is on the way
if headers.has_key('content-length'):
self.totalLength = int(headers['content-length'][0])
else:
self.totalLength = 0
self.currentLength = 0.0
print ''
return client.HTTPDownloader.gotHeaders(self, headers)
def pagePart(self, data):
if self.status == '200':
self.currentLength += len(data)
if self.totalLength:
percent = "%i%%" % (
(self.currentLength/self.totalLength)*100)
else:
percent = '%dK' % (self.currentLength/1000)
print "\033[1FProgress: " + percent
return client.HTTPDownloader.pagePart(self, data)
def downloadWithProgress(url, file, contextFactory=None, *args, **kwargs):
scheme, host, port, path = client._parse(url)
factory = HTTPProgressDownloader(url, file, *args, **kwargs)
if scheme == 'https':
from twisted.internet import ssl
if contextFactory is None:
contextFactory = ssl.ClientContextFactory( )
reactor.connectSSL(host, port, factory, contextFactory)
else:
reactor.connectTCP(host, port, factory)
return factory.deferred
if __name__ == "__main__":
import sys
from twisted.internet import reactor
def downloadComplete(result):
print "Download Complete."
reactor.stop( )
def downloadError(failure):
print "Error:", failure.getErrorMessage( )
reactor.stop( )
url, outputFile = sys.argv[1:]
downloadWithProgress(url, outputFile).addCallback(
downloadComplete).addErrback(
downloadError)
reactor.run( )
Запускать скрипт надо с параметрами :
python webdownload.py http://www.oreilly.com/ oreilly.html
Progress: 100% <- updated during the download
Download Complete.
Мы перегружаем базовый метод gotHeaders для проверки хидера Content-Length ,
а также базовый метод pagePart , работающий во время закачки .
Каждый раз при поступлении новой порции данных HTTPProgressDownloader будет печатать процент .
Twistd
В твистеде есть программа , называемая twistd , которая позволяет запустить скрипт в качестве демона.
Twistd позволяет логировать события , давать ограничения на права .
Сначала напишем простой слушающий эхо-сервер reverse.py , который возвращает реверс полученной от клиента строки:
from twisted.application import service, internet
from twisted.internet import protocol, reactor
from twisted.protocols import basic
def reverse(string):
return string[::-1]
class ReverserProtocol(basic.LineReceiver):
def lineReceived(self, line):
if hasattr(self, 'handle_' + line):
getattr(self, 'handle_' + line)( )
else:
self.sendLine(reverse(line))
def handle_quit(self):
self.transport.loseConnection( )
class ReverserFactory(protocol.ServerFactory):
protocol = ReverserProtocol
class ReverserService(internet.TCPServer):
def __init__(self):
internet.TCPServer.__init__(self, 2323, ReverserFactory( ))
Теперь напишем для него другой управляющий скрипт reverse_app.py
from twisted.application import service
import reverse
application = service.Application("Reverser")
reverserService = reverse.ReverserService( )
reverserService.setServiceParent(application)
и запустим его :
twistd -y reverse_app.py
Демон запущен . Теперь запустим телнет :
telnet localhost 2323
Trying 127.0.0.1...
Connected to sparky.
Escape character is '^]'.
hello world!
!dlrow olleh
quit
|
сергей | Папа 2010-08-22 22:03:54 | |
|