Ticket #758: tube-stable-api.patch

File tube-stable-api.patch, 11.6 KB (added by gdesmott, 15 years ago)
  • readactivity.py

    diff --git a/readactivity.py b/readactivity.py
    index 75e847e..ac97af2 100644
    a b import dbus 
    2525import evince
    2626import gobject
    2727import gtk
    28 import telepathy
     28
     29from telepathy.client import Channel
     30from telepathy.interfaces import CONNECTION_INTERFACE_REQUESTS, CHANNEL_INTERFACE,\
     31    CHANNEL_INTERFACE_TUBE, CHANNEL_TYPE_STREAM_TUBE
     32from telepathy.constants import CONNECTION_HANDLE_TYPE_ROOM, SOCKET_ADDRESS_TYPE_IPV4,\
     33    SOCKET_ACCESS_CONTROL_LOCALHOST
     34
     35from dbus import PROPERTIES_IFACE
    2936
    3037from sugar.activity import activity
    3138from sugar import network
    class ReadActivity(activity.Activity): 
    159166        # start with sleep off
    160167        self._sleep_inhibit = True
    161168
     169        # set of paths of tube channel that could be used to download the file
    162170        self.unused_download_tubes = set()
    163171        self._want_document = True
    164172        self._download_content_length = 0
    class ReadActivity(activity.Activity): 
    342350        self._close_requested = True
    343351        return True
    344352
    345     def _download_result_cb(self, getter, tempfile, suggested_name, tube_id):
     353    def _download_result_cb(self, getter, tempfile, suggested_name, tube_path):
    346354        if self._download_content_type == 'text/html':
    347355            # got an error page instead
    348             self._download_error_cb(getter, 'HTTP Error', tube_id)
     356            self._download_error_cb(getter, 'HTTP Error', tube_path)
    349357            return
    350358
    351359        del self.unused_download_tubes
    class ReadActivity(activity.Activity): 
    358366        self._jobject.file_path = file_path
    359367        datastore.write(self._jobject, transfer_ownership=True)
    360368
    361         _logger.debug("Got document %s (%s) from tube %u",
    362                       tempfile, suggested_name, tube_id)
     369        _logger.debug("Got document %s (%s) from tube %s",
     370                      tempfile, suggested_name, tube_path)
    363371        self._load_document("file://%s" % tempfile)
    364372        self.save()
    365373
    366     def _download_progress_cb(self, getter, bytes_downloaded, tube_id):
     374    def _download_progress_cb(self, getter, bytes_downloaded, tube_path):
    367375        # FIXME: Draw a progress bar
    368376        if self._download_content_length > 0:
    369             _logger.debug("Downloaded %u of %u bytes from tube %u...",
     377            _logger.debug("Downloaded %u of %u bytes from tube %s...",
    370378                          bytes_downloaded, self._download_content_length,
    371                           tube_id)
     379                          tube_path)
    372380        else:
    373             _logger.debug("Downloaded %u bytes from tube %u...",
    374                           bytes_downloaded, tube_id)
     381            _logger.debug("Downloaded %u bytes from tube %s...",
     382                          bytes_downloaded, tube_path)
    375383
    376     def _download_error_cb(self, getter, err, tube_id):
    377         _logger.debug("Error getting document from tube %u: %s",
    378                       tube_id, err)
     384    def _download_error_cb(self, getter, err, tube_path):
     385        _logger.debug("Error getting document from tube %s: %s",
     386                      tube_path, err)
    379387        self._want_document = True
    380388        self._download_content_length = 0
    381389        self._download_content_type = None
    382390        gobject.idle_add(self._get_document)
    383391
    384     def _download_document(self, tube_id, path):
     392    def _download_document(self, tube_path, path):
    385393        # FIXME: should ideally have the CM listen on a Unix socket
    386394        # instead of IPv4 (might be more compatible with Rainbow)
    387         chan = self.shared_activity.telepathy_tubes_chan
    388         iface = chan[telepathy.CHANNEL_TYPE_TUBES]
    389         addr = iface.AcceptStreamTube(tube_id,
    390                 telepathy.SOCKET_ADDRESS_TYPE_IPV4,
    391                 telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
    392                 utf8_strings=True)
    393         _logger.debug('Accepted stream tube: listening address is %r', addr)
    394         # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
    395         assert isinstance(addr, dbus.Struct)
    396         assert len(addr) == 2
    397         assert isinstance(addr[0], str)
    398         assert isinstance(addr[1], (int, long))
    399         assert addr[1] > 0 and addr[1] < 65536
    400         port = int(addr[1])
    401 
    402         getter = ReadURLDownloader("http://%s:%d/document"
    403                                            % (addr[0], port))
    404         getter.connect("finished", self._download_result_cb, tube_id)
    405         getter.connect("progress", self._download_progress_cb, tube_id)
    406         getter.connect("error", self._download_error_cb, tube_id)
    407         _logger.debug("Starting download to %s...", path)
    408         getter.start(path)
    409         self._download_content_length = getter.get_content_length()
    410         self._download_content_type = getter.get_content_type()
     395
     396        conn = self.shared_activity.telepathy_conn
     397        tube_chan = Channel(conn.dbus_proxy.bus_name, tube_path)
     398
     399        def accept_stream_tube_reply_cb(addr):
     400            _logger.debug('Accepted stream tube: listening address is %r', addr)
     401
     402            # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
     403            assert isinstance(addr, dbus.Struct)
     404            assert len(addr) == 2
     405            assert isinstance(addr[0], str)
     406            assert isinstance(addr[1], (int, long))
     407            assert addr[1] > 0 and addr[1] < 65536
     408            port = int(addr[1])
     409
     410            getter = ReadURLDownloader("http://%s:%d/document"
     411                                               % (addr[0], port))
     412            getter.connect("finished", self._download_result_cb, tube_path)
     413            getter.connect("progress", self._download_progress_cb, tube_path)
     414            getter.connect("error", self._download_error_cb, tube_path)
     415            _logger.debug("Starting download to %s...", path)
     416            getter.start(path)
     417            self._download_content_length = getter.get_content_length()
     418            self._download_content_type = getter.get_content_type()
     419
     420        def accept_stream_tube_error_cb(e):
     421            _logger.error('OfferStreamTube failed: %s' % e)
     422
     423        addr = tube_chan[CHANNEL_TYPE_STREAM_TUBE].Accept(
     424                SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
     425                utf8_strings=True,
     426                reply_handler=accept_stream_tube_reply_cb,
     427                error_handler=accept_stream_tube_error_cb)
     428
    411429        return False
    412430
    413431    def _get_document(self):
    class ReadActivity(activity.Activity): 
    423441
    424442        # Pick an arbitrary tube we can try to download the document from
    425443        try:
    426             tube_id = self.unused_download_tubes.pop()
     444            tube_path = self.unused_download_tubes.pop()
    427445        except (ValueError, KeyError), e:
    428446            _logger.debug('No tubes to get the document from right now: %s',
    429447                          e)
    class ReadActivity(activity.Activity): 
    431449
    432450        # Avoid trying to download the document multiple times at once
    433451        self._want_document = False
    434         gobject.idle_add(self._download_document, tube_id, path)
     452        gobject.idle_add(self._download_document, tube_path, path)
    435453        return False
    436454
    437455    def _joined_cb(self, also_self):
    class ReadActivity(activity.Activity): 
    504522            self._tempfile)
    505523
    506524        # Make a tube for it
    507         chan = self.shared_activity.telepathy_tubes_chan
    508         iface = chan[telepathy.CHANNEL_TYPE_TUBES]
    509         self._fileserver_tube_id = iface.OfferStreamTube(READ_STREAM_SERVICE,
    510                 {},
    511                 telepathy.SOCKET_ADDRESS_TYPE_IPV4,
     525        conn = self.shared_activity.telepathy_conn
     526        room_handle = self.shared_activity.telepathy_room_handle
     527
     528        def create_tube_reply_cb(path, props):
     529            _logger.debug("tube channel created. Offer the tube")
     530
     531            self._tube_chan = Channel(conn.dbus_proxy.bus_name, path)
     532
     533            self._tube_chan[CHANNEL_TYPE_STREAM_TUBE].Offer(SOCKET_ADDRESS_TYPE_IPV4,
    512534                ('127.0.0.1', dbus.UInt16(self.port)),
    513                 telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0)
     535                SOCKET_ACCESS_CONTROL_LOCALHOST, {})
     536
     537        def create_tube_error_cb(e):
     538            _logger.error('Tube channel creation failed: %s' % e)
     539
     540        conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
     541            CHANNEL_INTERFACE + ".ChannelType": CHANNEL_TYPE_STREAM_TUBE,
     542            CHANNEL_INTERFACE + ".TargetHandleType": CONNECTION_HANDLE_TYPE_ROOM,
     543            CHANNEL_INTERFACE + ".TargetHandle": room_handle,
     544            CHANNEL_TYPE_STREAM_TUBE + ".Service": READ_STREAM_SERVICE},
     545            reply_handler=create_tube_reply_cb,
     546            error_handler=create_tube_error_cb)
    514547
    515548    def watch_for_tubes(self):
    516549        """Watch for new tubes."""
    517         tubes_chan = self.shared_activity.telepathy_tubes_chan
    518 
    519         tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube',
    520             self._new_tube_cb)
    521         tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(
    522             reply_handler=self._list_tubes_reply_cb,
    523             error_handler=self._list_tubes_error_cb)
    524 
    525     def _new_tube_cb(self, tube_id, initiator, tube_type, service, params,
    526                      state):
    527         """Callback when a new tube becomes available."""
    528         _logger.debug('New tube: ID=%d initator=%d type=%d service=%s '
    529                       'params=%r state=%d', tube_id, initiator, tube_type,
    530                       service, params, state)
    531         if self._document is None and service == READ_STREAM_SERVICE:
    532             _logger.debug('I could download from that tube')
    533             self.unused_download_tubes.add(tube_id)
    534             # if no download is in progress, let's fetch the document
    535             if self._want_document:
    536                 gobject.idle_add(self._get_document)
    537 
    538     def _list_tubes_reply_cb(self, tubes):
    539         """Callback when new tubes are available."""
    540         for tube_info in tubes:
    541             self._new_tube_cb(*tube_info)
    542 
    543     def _list_tubes_error_cb(self, e):
    544         """Handle ListTubes error by logging."""
    545         _logger.error('ListTubes() failed: %s', e)
     550        conn = self.shared_activity.telepathy_conn
     551
     552        conn[CONNECTION_INTERFACE_REQUESTS].connect_to_signal('NewChannels',
     553            self.__new_channels_cb)
     554
     555        # look for existing tube channels
     556        conn[PROPERTIES_IFACE].Get(CONNECTION_INTERFACE_REQUESTS, 'Channels',
     557            reply_handler=self.__get_channels_reply_cb,
     558            error_handler=self.__get_channels_error_cb)
     559
     560    def __new_channels_cb(self, channels):
     561        """Callback when a new channel is created. We are interested about the stream tube ones"""
     562        for path, props in channels:
     563            if props[CHANNEL_INTERFACE + ".ChannelType"] != CHANNEL_TYPE_STREAM_TUBE:
     564                continue
     565
     566            _logger.debug('New tube: initator=%s service=%s',
     567                props[CHANNEL_INTERFACE + '.InitiatorID'],
     568                props[CHANNEL_TYPE_STREAM_TUBE + '.Service'])
     569
     570            if self._document is None and props[CHANNEL_TYPE_STREAM_TUBE + '.Service'] == READ_STREAM_SERVICE:
     571                _logger.debug('I could download from that tube')
     572                self.unused_download_tubes.add(path)
     573                # if no download is in progress, let's fetch the document
     574                if self._want_document:
     575                    gobject.idle_add(self._get_document)
     576
     577    def __get_channels_reply_cb(self, channels):
     578        """Callback when existing channels are available."""
     579        self.__new_channels_cb(channels)
     580
     581    def __get_channels_error_cb(self, e):
     582        """Handle get channels error by logging."""
     583        _logger.error('Get channels failed: %s', e)
    546584
    547585    def _shared_cb(self, activityid):
    548586        """Callback when activity shared.