Ticket #758: tube-new-api.patch

File tube-new-api.patch, 12.2 KB (added by gdesmott, 15 years ago)

readactivity.py | 196 ++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 121 insertions(+), 75 deletions(-)

  • readactivity.py

    diff --git a/readactivity.py b/readactivity.py
    index e2afab5..8e8b09c 100644
    a b import dbus 
    2525import evince
    2626import gobject
    2727import gtk
    28 import telepathy
     28
     29from telepathy.client import Channel
     30from telepathy.interfaces import CONNECTION_INTERFACE_REQUESTS, CHANNEL_INTERFACE
     31from telepathy.constants import CONNECTION_HANDLE_TYPE_ROOM, SOCKET_ADDRESS_TYPE_IPV4,\
     32    SOCKET_ACCESS_CONTROL_LOCALHOST
     33
     34from dbus import PROPERTIES_IFACE
    2935
    3036from sugar.activity import activity
    3137from sugar import network
    _TOOLBAR_READ = 2 
    4450
    4551_logger = logging.getLogger('read-activity')
    4652
     53# TODO: import when tube API is stable
     54CHANNEL_INTERFACE_TUBE = CHANNEL_INTERFACE + ".Interface.Tube.DRAFT"
     55CHANNEL_TYPE_STREAM_TUBE = CHANNEL_INTERFACE + ".Type.StreamTube.DRAFT"
     56
     57TUBE_CHANNEL_STATE_LOCAL_PENDING = 0
     58TUBE_CHANNEL_STATE_REMOTE_PENDING = 1
     59TUBE_CHANNEL_STATE_OPEN = 2
     60TUBE_CHANNEL_STATE_NOT_OFFERED = 3
     61
    4762def _get_screen_dpi():
    4863    xft_dpi = gtk.settings_get_default().get_property('gtk-xft-dpi')
    4964    _logger.debug('Setting dpi to %f', float(xft_dpi / 1024))
    class ReadActivity(activity.Activity): 
    155170        # start with sleep off
    156171        self._sleep_inhibit = True
    157172
     173        # set of paths of tube channel that could be used to download the file
    158174        self.unused_download_tubes = set()
    159175        self._want_document = True
    160176        self._download_content_length = 0
    class ReadActivity(activity.Activity): 
    338354        self._close_requested = True
    339355        return True
    340356
    341     def _download_result_cb(self, getter, tempfile, suggested_name, tube_id):
     357    def _download_result_cb(self, getter, tempfile, suggested_name, tube_path):
    342358        if self._download_content_type == 'text/html':
    343359            # got an error page instead
    344             self._download_error_cb(getter, 'HTTP Error', tube_id)
     360            self._download_error_cb(getter, 'HTTP Error', tube_path)
    345361            return
    346362
    347363        del self.unused_download_tubes
    class ReadActivity(activity.Activity): 
    354370        self._jobject.file_path = file_path
    355371        datastore.write(self._jobject, transfer_ownership=True)
    356372
    357         _logger.debug("Got document %s (%s) from tube %u",
    358                       tempfile, suggested_name, tube_id)
     373        _logger.debug("Got document %s (%s) from tube %s",
     374                      tempfile, suggested_name, tube_path)
    359375        self._load_document("file://%s" % tempfile)
    360376        self.save()
    361377
    362     def _download_progress_cb(self, getter, bytes_downloaded, tube_id):
     378    def _download_progress_cb(self, getter, bytes_downloaded, tube_path):
    363379        # FIXME: Draw a progress bar
    364380        if self._download_content_length > 0:
    365             _logger.debug("Downloaded %u of %u bytes from tube %u...",
     381            _logger.debug("Downloaded %u of %u bytes from tube %s...",
    366382                          bytes_downloaded, self._download_content_length,
    367                           tube_id)
     383                          tube_path)
    368384        else:
    369             _logger.debug("Downloaded %u bytes from tube %u...",
    370                           bytes_downloaded, tube_id)
     385            _logger.debug("Downloaded %u bytes from tube %s...",
     386                          bytes_downloaded, tube_path)
    371387
    372     def _download_error_cb(self, getter, err, tube_id):
    373         _logger.debug("Error getting document from tube %u: %s",
    374                       tube_id, err)
     388    def _download_error_cb(self, getter, err, tube_path):
     389        _logger.debug("Error getting document from tube %s: %s",
     390                      tube_path, err)
    375391        self._want_document = True
    376392        self._download_content_length = 0
    377393        self._download_content_type = None
    378394        gobject.idle_add(self._get_document)
    379395
    380     def _download_document(self, tube_id, path):
     396    def _download_document(self, tube_path, path):
    381397        # FIXME: should ideally have the CM listen on a Unix socket
    382398        # instead of IPv4 (might be more compatible with Rainbow)
    383         chan = self.shared_activity.telepathy_tubes_chan
    384         iface = chan[telepathy.CHANNEL_TYPE_TUBES]
    385         addr = iface.AcceptStreamTube(tube_id,
    386                 telepathy.SOCKET_ADDRESS_TYPE_IPV4,
    387                 telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
    388                 utf8_strings=True)
    389         _logger.debug('Accepted stream tube: listening address is %r', addr)
    390         # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
    391         assert isinstance(addr, dbus.Struct)
    392         assert len(addr) == 2
    393         assert isinstance(addr[0], str)
    394         assert isinstance(addr[1], (int, long))
    395         assert addr[1] > 0 and addr[1] < 65536
    396         port = int(addr[1])
    397 
    398         getter = ReadURLDownloader("http://%s:%d/document"
    399                                            % (addr[0], port))
    400         getter.connect("finished", self._download_result_cb, tube_id)
    401         getter.connect("progress", self._download_progress_cb, tube_id)
    402         getter.connect("error", self._download_error_cb, tube_id)
    403         _logger.debug("Starting download to %s...", path)
    404         getter.start(path)
    405         self._download_content_length = getter.get_content_length()
    406         self._download_content_type = getter.get_content_type()
     399
     400        conn = self.shared_activity.telepathy_conn
     401        tube_chan = Channel(conn.dbus_proxy.bus_name, tube_path)
     402
     403        def accept_stream_tube_reply_cb(addr):
     404            _logger.debug('Accepted stream tube: listening address is %r', addr)
     405
     406            # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
     407            assert isinstance(addr, dbus.Struct)
     408            assert len(addr) == 2
     409            assert isinstance(addr[0], str)
     410            assert isinstance(addr[1], (int, long))
     411            assert addr[1] > 0 and addr[1] < 65536
     412            port = int(addr[1])
     413
     414            getter = ReadURLDownloader("http://%s:%d/document"
     415                                               % (addr[0], port))
     416            getter.connect("finished", self._download_result_cb, tube_path)
     417            getter.connect("progress", self._download_progress_cb, tube_path)
     418            getter.connect("error", self._download_error_cb, tube_path)
     419            _logger.debug("Starting download to %s...", path)
     420            getter.start(path)
     421            self._download_content_length = getter.get_content_length()
     422            self._download_content_type = getter.get_content_type()
     423
     424        def accept_stream_tube_error_cb(e):
     425            _logger.error('OfferStreamTube failed: %s' % e)
     426
     427        addr = tube_chan[CHANNEL_TYPE_STREAM_TUBE].AcceptStreamTube(
     428                SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
     429                utf8_strings=True,
     430                reply_handler=accept_stream_tube_reply_cb,
     431                error_handler=accept_stream_tube_error_cb)
     432
    407433        return False
    408434
    409435    def _get_document(self):
    class ReadActivity(activity.Activity): 
    419445
    420446        # Pick an arbitrary tube we can try to download the document from
    421447        try:
    422             tube_id = self.unused_download_tubes.pop()
     448            tube_path = self.unused_download_tubes.pop()
    423449        except (ValueError, KeyError), e:
    424450            _logger.debug('No tubes to get the document from right now: %s',
    425451                          e)
    class ReadActivity(activity.Activity): 
    427453
    428454        # Avoid trying to download the document multiple times at once
    429455        self._want_document = False
    430         gobject.idle_add(self._download_document, tube_id, path)
     456        gobject.idle_add(self._download_document, tube_path, path)
    431457        return False
    432458
    433459    def _joined_cb(self, also_self):
    class ReadActivity(activity.Activity): 
    500526            self._tempfile)
    501527
    502528        # Make a tube for it
    503         chan = self.shared_activity.telepathy_tubes_chan
    504         iface = chan[telepathy.CHANNEL_TYPE_TUBES]
    505         self._fileserver_tube_id = iface.OfferStreamTube(READ_STREAM_SERVICE,
    506                 {},
    507                 telepathy.SOCKET_ADDRESS_TYPE_IPV4,
     529        conn = self.shared_activity.telepathy_conn
     530        room_handle = self.shared_activity.telepathy_room_handle
     531
     532        def create_tube_reply_cb(path, props):
     533            _logger.debug("tube channel created. Offer the tube")
     534
     535            self._tube_chan = Channel(conn.dbus_proxy.bus_name, path)
     536
     537            self._tube_chan[CHANNEL_TYPE_STREAM_TUBE].OfferStreamTube(SOCKET_ADDRESS_TYPE_IPV4,
    508538                ('127.0.0.1', dbus.UInt16(self.port)),
    509                 telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0)
     539                SOCKET_ACCESS_CONTROL_LOCALHOST, 0, {})
     540
     541        def create_tube_error_cb(e):
     542            _logger.error('Tube channel creation failed: %s' % e)
     543
     544        conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
     545            CHANNEL_INTERFACE + ".ChannelType": CHANNEL_TYPE_STREAM_TUBE,
     546            CHANNEL_INTERFACE + ".TargetHandleType": CONNECTION_HANDLE_TYPE_ROOM,
     547            CHANNEL_INTERFACE + ".TargetHandle": room_handle,
     548            CHANNEL_TYPE_STREAM_TUBE + ".Service": READ_STREAM_SERVICE},
     549            reply_handler=create_tube_reply_cb,
     550            error_handler=create_tube_error_cb)
    510551
    511552    def watch_for_tubes(self):
    512553        """Watch for new tubes."""
    513         tubes_chan = self.shared_activity.telepathy_tubes_chan
    514 
    515         tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube',
    516             self._new_tube_cb)
    517         tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(
    518             reply_handler=self._list_tubes_reply_cb,
    519             error_handler=self._list_tubes_error_cb)
    520 
    521     def _new_tube_cb(self, tube_id, initiator, tube_type, service, params,
    522                      state):
    523         """Callback when a new tube becomes available."""
    524         _logger.debug('New tube: ID=%d initator=%d type=%d service=%s '
    525                       'params=%r state=%d', tube_id, initiator, tube_type,
    526                       service, params, state)
    527         if self._document is None and service == READ_STREAM_SERVICE:
    528             _logger.debug('I could download from that tube')
    529             self.unused_download_tubes.add(tube_id)
    530             # if no download is in progress, let's fetch the document
    531             if self._want_document:
    532                 gobject.idle_add(self._get_document)
    533 
    534     def _list_tubes_reply_cb(self, tubes):
    535         """Callback when new tubes are available."""
    536         for tube_info in tubes:
    537             self._new_tube_cb(*tube_info)
    538 
    539     def _list_tubes_error_cb(self, e):
    540         """Handle ListTubes error by logging."""
    541         _logger.error('ListTubes() failed: %s', e)
     554        conn = self.shared_activity.telepathy_conn
     555
     556        conn[CONNECTION_INTERFACE_REQUESTS].connect_to_signal('NewChannels',
     557            self.__new_channels_cb)
     558
     559        # look for existing tube channels
     560        conn[PROPERTIES_IFACE].Get(CONNECTION_INTERFACE_REQUESTS, 'Channels',
     561            reply_handler=self.__get_channels_reply_cb,
     562            error_handler=self.__get_channels_error_cb)
     563
     564    def __new_channels_cb(self, channels):
     565        """Callback when a new channel is created. We are interested about the stream tube ones"""
     566        for path, props in channels:
     567            if props[CHANNEL_INTERFACE + ".ChannelType"] != CHANNEL_TYPE_STREAM_TUBE:
     568                continue
     569
     570            _logger.debug('New tube: initator=%s service=%s',
     571                props[CHANNEL_INTERFACE + '.InitiatorID'],
     572                props[CHANNEL_TYPE_STREAM_TUBE + '.Service'])
     573
     574            if self._document is None and props[CHANNEL_TYPE_STREAM_TUBE + '.Service'] == READ_STREAM_SERVICE:
     575                _logger.debug('I could download from that tube')
     576                self.unused_download_tubes.add(path)
     577                # if no download is in progress, let's fetch the document
     578                if self._want_document:
     579                    gobject.idle_add(self._get_document)
     580
     581    def __get_channels_reply_cb(self, channels):
     582        """Callback when existing channels are available."""
     583        self.__new_channels_cb(channels)
     584
     585    def __get_channels_error_cb(self, e):
     586        """Handle get channels error by logging."""
     587        _logger.error('Get channels failed: %s', e)
    542588
    543589    def _shared_cb(self, activityid):
    544590        """Callback when activity shared.