387 | | chan = self.shared_activity.telepathy_tubes_chan |
388 | | iface = chan[telepathy.CHANNEL_TYPE_TUBES] |
389 | | addr = iface.AcceptStreamTube(tube_id, |
390 | | telepathy.SOCKET_ADDRESS_TYPE_IPV4, |
391 | | telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0, |
392 | | utf8_strings=True) |
393 | | _logger.debug('Accepted stream tube: listening address is %r', addr) |
394 | | # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)' |
395 | | assert isinstance(addr, dbus.Struct) |
396 | | assert len(addr) == 2 |
397 | | assert isinstance(addr[0], str) |
398 | | assert isinstance(addr[1], (int, long)) |
399 | | assert addr[1] > 0 and addr[1] < 65536 |
400 | | port = int(addr[1]) |
401 | | |
402 | | getter = ReadURLDownloader("http://%s:%d/document" |
403 | | % (addr[0], port)) |
404 | | getter.connect("finished", self._download_result_cb, tube_id) |
405 | | getter.connect("progress", self._download_progress_cb, tube_id) |
406 | | getter.connect("error", self._download_error_cb, tube_id) |
407 | | _logger.debug("Starting download to %s...", path) |
408 | | getter.start(path) |
409 | | self._download_content_length = getter.get_content_length() |
410 | | self._download_content_type = getter.get_content_type() |
| 395 | |
| 396 | conn = self.shared_activity.telepathy_conn |
| 397 | tube_chan = Channel(conn.dbus_proxy.bus_name, tube_path) |
| 398 | |
| 399 | def accept_stream_tube_reply_cb(addr): |
| 400 | _logger.debug('Accepted stream tube: listening address is %r', addr) |
| 401 | |
| 402 | # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)' |
| 403 | assert isinstance(addr, dbus.Struct) |
| 404 | assert len(addr) == 2 |
| 405 | assert isinstance(addr[0], str) |
| 406 | assert isinstance(addr[1], (int, long)) |
| 407 | assert addr[1] > 0 and addr[1] < 65536 |
| 408 | port = int(addr[1]) |
| 409 | |
| 410 | getter = ReadURLDownloader("http://%s:%d/document" |
| 411 | % (addr[0], port)) |
| 412 | getter.connect("finished", self._download_result_cb, tube_path) |
| 413 | getter.connect("progress", self._download_progress_cb, tube_path) |
| 414 | getter.connect("error", self._download_error_cb, tube_path) |
| 415 | _logger.debug("Starting download to %s...", path) |
| 416 | getter.start(path) |
| 417 | self._download_content_length = getter.get_content_length() |
| 418 | self._download_content_type = getter.get_content_type() |
| 419 | |
| 420 | def accept_stream_tube_error_cb(e): |
| 421 | _logger.error('OfferStreamTube failed: %s' % e) |
| 422 | |
| 423 | addr = tube_chan[CHANNEL_TYPE_STREAM_TUBE].Accept( |
| 424 | SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ACCESS_CONTROL_LOCALHOST, 0, |
| 425 | utf8_strings=True, |
| 426 | reply_handler=accept_stream_tube_reply_cb, |
| 427 | error_handler=accept_stream_tube_error_cb) |
| 428 | |
507 | | chan = self.shared_activity.telepathy_tubes_chan |
508 | | iface = chan[telepathy.CHANNEL_TYPE_TUBES] |
509 | | self._fileserver_tube_id = iface.OfferStreamTube(READ_STREAM_SERVICE, |
510 | | {}, |
511 | | telepathy.SOCKET_ADDRESS_TYPE_IPV4, |
| 525 | conn = self.shared_activity.telepathy_conn |
| 526 | room_handle = self.shared_activity.telepathy_room_handle |
| 527 | |
| 528 | def create_tube_reply_cb(path, props): |
| 529 | _logger.debug("tube channel created. Offer the tube") |
| 530 | |
| 531 | self._tube_chan = Channel(conn.dbus_proxy.bus_name, path) |
| 532 | |
| 533 | self._tube_chan[CHANNEL_TYPE_STREAM_TUBE].Offer(SOCKET_ADDRESS_TYPE_IPV4, |
513 | | telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0) |
| 535 | SOCKET_ACCESS_CONTROL_LOCALHOST, {}) |
| 536 | |
| 537 | def create_tube_error_cb(e): |
| 538 | _logger.error('Tube channel creation failed: %s' % e) |
| 539 | |
| 540 | conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({ |
| 541 | CHANNEL_INTERFACE + ".ChannelType": CHANNEL_TYPE_STREAM_TUBE, |
| 542 | CHANNEL_INTERFACE + ".TargetHandleType": CONNECTION_HANDLE_TYPE_ROOM, |
| 543 | CHANNEL_INTERFACE + ".TargetHandle": room_handle, |
| 544 | CHANNEL_TYPE_STREAM_TUBE + ".Service": READ_STREAM_SERVICE}, |
| 545 | reply_handler=create_tube_reply_cb, |
| 546 | error_handler=create_tube_error_cb) |
517 | | tubes_chan = self.shared_activity.telepathy_tubes_chan |
518 | | |
519 | | tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube', |
520 | | self._new_tube_cb) |
521 | | tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes( |
522 | | reply_handler=self._list_tubes_reply_cb, |
523 | | error_handler=self._list_tubes_error_cb) |
524 | | |
525 | | def _new_tube_cb(self, tube_id, initiator, tube_type, service, params, |
526 | | state): |
527 | | """Callback when a new tube becomes available.""" |
528 | | _logger.debug('New tube: ID=%d initator=%d type=%d service=%s ' |
529 | | 'params=%r state=%d', tube_id, initiator, tube_type, |
530 | | service, params, state) |
531 | | if self._document is None and service == READ_STREAM_SERVICE: |
532 | | _logger.debug('I could download from that tube') |
533 | | self.unused_download_tubes.add(tube_id) |
534 | | # if no download is in progress, let's fetch the document |
535 | | if self._want_document: |
536 | | gobject.idle_add(self._get_document) |
537 | | |
538 | | def _list_tubes_reply_cb(self, tubes): |
539 | | """Callback when new tubes are available.""" |
540 | | for tube_info in tubes: |
541 | | self._new_tube_cb(*tube_info) |
542 | | |
543 | | def _list_tubes_error_cb(self, e): |
544 | | """Handle ListTubes error by logging.""" |
545 | | _logger.error('ListTubes() failed: %s', e) |
| 550 | conn = self.shared_activity.telepathy_conn |
| 551 | |
| 552 | conn[CONNECTION_INTERFACE_REQUESTS].connect_to_signal('NewChannels', |
| 553 | self.__new_channels_cb) |
| 554 | |
| 555 | # look for existing tube channels |
| 556 | conn[PROPERTIES_IFACE].Get(CONNECTION_INTERFACE_REQUESTS, 'Channels', |
| 557 | reply_handler=self.__get_channels_reply_cb, |
| 558 | error_handler=self.__get_channels_error_cb) |
| 559 | |
| 560 | def __new_channels_cb(self, channels): |
| 561 | """Callback when a new channel is created. We are interested about the stream tube ones""" |
| 562 | for path, props in channels: |
| 563 | if props[CHANNEL_INTERFACE + ".ChannelType"] != CHANNEL_TYPE_STREAM_TUBE: |
| 564 | continue |
| 565 | |
| 566 | _logger.debug('New tube: initator=%s service=%s', |
| 567 | props[CHANNEL_INTERFACE + '.InitiatorID'], |
| 568 | props[CHANNEL_TYPE_STREAM_TUBE + '.Service']) |
| 569 | |
| 570 | if self._document is None and props[CHANNEL_TYPE_STREAM_TUBE + '.Service'] == READ_STREAM_SERVICE: |
| 571 | _logger.debug('I could download from that tube') |
| 572 | self.unused_download_tubes.add(path) |
| 573 | # if no download is in progress, let's fetch the document |
| 574 | if self._want_document: |
| 575 | gobject.idle_add(self._get_document) |
| 576 | |
| 577 | def __get_channels_reply_cb(self, channels): |
| 578 | """Callback when existing channels are available.""" |
| 579 | self.__new_channels_cb(channels) |
| 580 | |
| 581 | def __get_channels_error_cb(self, e): |
| 582 | """Handle get channels error by logging.""" |
| 583 | _logger.error('Get channels failed: %s', e) |