383 | | chan = self.shared_activity.telepathy_tubes_chan |
384 | | iface = chan[telepathy.CHANNEL_TYPE_TUBES] |
385 | | addr = iface.AcceptStreamTube(tube_id, |
386 | | telepathy.SOCKET_ADDRESS_TYPE_IPV4, |
387 | | telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0, |
388 | | utf8_strings=True) |
389 | | _logger.debug('Accepted stream tube: listening address is %r', addr) |
390 | | # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)' |
391 | | assert isinstance(addr, dbus.Struct) |
392 | | assert len(addr) == 2 |
393 | | assert isinstance(addr[0], str) |
394 | | assert isinstance(addr[1], (int, long)) |
395 | | assert addr[1] > 0 and addr[1] < 65536 |
396 | | port = int(addr[1]) |
397 | | |
398 | | getter = ReadURLDownloader("http://%s:%d/document" |
399 | | % (addr[0], port)) |
400 | | getter.connect("finished", self._download_result_cb, tube_id) |
401 | | getter.connect("progress", self._download_progress_cb, tube_id) |
402 | | getter.connect("error", self._download_error_cb, tube_id) |
403 | | _logger.debug("Starting download to %s...", path) |
404 | | getter.start(path) |
405 | | self._download_content_length = getter.get_content_length() |
406 | | self._download_content_type = getter.get_content_type() |
| 399 | |
| 400 | conn = self.shared_activity.telepathy_conn |
| 401 | tube_chan = Channel(conn.dbus_proxy.bus_name, tube_path) |
| 402 | |
| 403 | def accept_stream_tube_reply_cb(addr): |
| 404 | _logger.debug('Accepted stream tube: listening address is %r', addr) |
| 405 | |
| 406 | # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)' |
| 407 | assert isinstance(addr, dbus.Struct) |
| 408 | assert len(addr) == 2 |
| 409 | assert isinstance(addr[0], str) |
| 410 | assert isinstance(addr[1], (int, long)) |
| 411 | assert addr[1] > 0 and addr[1] < 65536 |
| 412 | port = int(addr[1]) |
| 413 | |
| 414 | getter = ReadURLDownloader("http://%s:%d/document" |
| 415 | % (addr[0], port)) |
| 416 | getter.connect("finished", self._download_result_cb, tube_path) |
| 417 | getter.connect("progress", self._download_progress_cb, tube_path) |
| 418 | getter.connect("error", self._download_error_cb, tube_path) |
| 419 | _logger.debug("Starting download to %s...", path) |
| 420 | getter.start(path) |
| 421 | self._download_content_length = getter.get_content_length() |
| 422 | self._download_content_type = getter.get_content_type() |
| 423 | |
| 424 | def accept_stream_tube_error_cb(e): |
| 425 | _logger.error('OfferStreamTube failed: %s' % e) |
| 426 | |
| 427 | addr = tube_chan[CHANNEL_TYPE_STREAM_TUBE].AcceptStreamTube( |
| 428 | SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ACCESS_CONTROL_LOCALHOST, 0, |
| 429 | utf8_strings=True, |
| 430 | reply_handler=accept_stream_tube_reply_cb, |
| 431 | error_handler=accept_stream_tube_error_cb) |
| 432 | |
503 | | chan = self.shared_activity.telepathy_tubes_chan |
504 | | iface = chan[telepathy.CHANNEL_TYPE_TUBES] |
505 | | self._fileserver_tube_id = iface.OfferStreamTube(READ_STREAM_SERVICE, |
506 | | {}, |
507 | | telepathy.SOCKET_ADDRESS_TYPE_IPV4, |
| 529 | conn = self.shared_activity.telepathy_conn |
| 530 | room_handle = self.shared_activity.telepathy_room_handle |
| 531 | |
| 532 | def create_tube_reply_cb(path, props): |
| 533 | _logger.debug("tube channel created. Offer the tube") |
| 534 | |
| 535 | self._tube_chan = Channel(conn.dbus_proxy.bus_name, path) |
| 536 | |
| 537 | self._tube_chan[CHANNEL_TYPE_STREAM_TUBE].OfferStreamTube(SOCKET_ADDRESS_TYPE_IPV4, |
509 | | telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0) |
| 539 | SOCKET_ACCESS_CONTROL_LOCALHOST, 0, {}) |
| 540 | |
| 541 | def create_tube_error_cb(e): |
| 542 | _logger.error('Tube channel creation failed: %s' % e) |
| 543 | |
| 544 | conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({ |
| 545 | CHANNEL_INTERFACE + ".ChannelType": CHANNEL_TYPE_STREAM_TUBE, |
| 546 | CHANNEL_INTERFACE + ".TargetHandleType": CONNECTION_HANDLE_TYPE_ROOM, |
| 547 | CHANNEL_INTERFACE + ".TargetHandle": room_handle, |
| 548 | CHANNEL_TYPE_STREAM_TUBE + ".Service": READ_STREAM_SERVICE}, |
| 549 | reply_handler=create_tube_reply_cb, |
| 550 | error_handler=create_tube_error_cb) |
513 | | tubes_chan = self.shared_activity.telepathy_tubes_chan |
514 | | |
515 | | tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube', |
516 | | self._new_tube_cb) |
517 | | tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes( |
518 | | reply_handler=self._list_tubes_reply_cb, |
519 | | error_handler=self._list_tubes_error_cb) |
520 | | |
521 | | def _new_tube_cb(self, tube_id, initiator, tube_type, service, params, |
522 | | state): |
523 | | """Callback when a new tube becomes available.""" |
524 | | _logger.debug('New tube: ID=%d initator=%d type=%d service=%s ' |
525 | | 'params=%r state=%d', tube_id, initiator, tube_type, |
526 | | service, params, state) |
527 | | if self._document is None and service == READ_STREAM_SERVICE: |
528 | | _logger.debug('I could download from that tube') |
529 | | self.unused_download_tubes.add(tube_id) |
530 | | # if no download is in progress, let's fetch the document |
531 | | if self._want_document: |
532 | | gobject.idle_add(self._get_document) |
533 | | |
534 | | def _list_tubes_reply_cb(self, tubes): |
535 | | """Callback when new tubes are available.""" |
536 | | for tube_info in tubes: |
537 | | self._new_tube_cb(*tube_info) |
538 | | |
539 | | def _list_tubes_error_cb(self, e): |
540 | | """Handle ListTubes error by logging.""" |
541 | | _logger.error('ListTubes() failed: %s', e) |
| 554 | conn = self.shared_activity.telepathy_conn |
| 555 | |
| 556 | conn[CONNECTION_INTERFACE_REQUESTS].connect_to_signal('NewChannels', |
| 557 | self.__new_channels_cb) |
| 558 | |
| 559 | # look for existing tube channels |
| 560 | conn[PROPERTIES_IFACE].Get(CONNECTION_INTERFACE_REQUESTS, 'Channels', |
| 561 | reply_handler=self.__get_channels_reply_cb, |
| 562 | error_handler=self.__get_channels_error_cb) |
| 563 | |
| 564 | def __new_channels_cb(self, channels): |
| 565 | """Callback when a new channel is created. We are interested about the stream tube ones""" |
| 566 | for path, props in channels: |
| 567 | if props[CHANNEL_INTERFACE + ".ChannelType"] != CHANNEL_TYPE_STREAM_TUBE: |
| 568 | continue |
| 569 | |
| 570 | _logger.debug('New tube: initator=%s service=%s', |
| 571 | props[CHANNEL_INTERFACE + '.InitiatorID'], |
| 572 | props[CHANNEL_TYPE_STREAM_TUBE + '.Service']) |
| 573 | |
| 574 | if self._document is None and props[CHANNEL_TYPE_STREAM_TUBE + '.Service'] == READ_STREAM_SERVICE: |
| 575 | _logger.debug('I could download from that tube') |
| 576 | self.unused_download_tubes.add(path) |
| 577 | # if no download is in progress, let's fetch the document |
| 578 | if self._want_document: |
| 579 | gobject.idle_add(self._get_document) |
| 580 | |
| 581 | def __get_channels_reply_cb(self, channels): |
| 582 | """Callback when existing channels are available.""" |
| 583 | self.__new_channels_cb(channels) |
| 584 | |
| 585 | def __get_channels_error_cb(self, e): |
| 586 | """Handle get channels error by logging.""" |
| 587 | _logger.error('Get channels failed: %s', e) |