Ticket #4242: filetransfer.py

File filetransfer.py, 14.1 KB (added by ajay_garg, 11 years ago)
Line 
1# Copyright (C) 2008 Tomeu Vizoso
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
16
17import os
18import logging
19import socket
20
21from gi.repository import GObject
22from gi.repository import Gio
23from gi.repository import GLib
24import dbus
25from telepathy.interfaces import CONNECTION_INTERFACE_REQUESTS, CHANNEL
26from telepathy.constants import CONNECTION_HANDLE_TYPE_CONTACT,     \
27                                SOCKET_ADDRESS_TYPE_UNIX,           \
28                                SOCKET_ACCESS_CONTROL_LOCALHOST
29from telepathy.client import Connection, Channel
30
31from sugar3.presence import presenceservice
32from sugar3 import dispatch
33
34from jarabe.util.telepathy import connection_watcher
35from jarabe.model import neighborhood
36
37# Avoid "Fatal Python error: GC object already tracked"
38# http://stackoverflow.com/questions/7496629/gstreamer-appsrc-causes-random-crashes
39GObject.threads_init()
40
41FT_STATE_NONE = 0
42FT_STATE_PENDING = 1
43FT_STATE_ACCEPTED = 2
44FT_STATE_OPEN = 3
45FT_STATE_COMPLETED = 4
46FT_STATE_CANCELLED = 5
47
48FT_REASON_NONE = 0
49FT_REASON_REQUESTED = 1
50FT_REASON_LOCAL_STOPPED = 2
51FT_REASON_REMOTE_STOPPED = 3
52FT_REASON_LOCAL_ERROR = 4
53FT_REASON_LOCAL_ERROR = 5
54FT_REASON_REMOTE_ERROR = 6
55
56# FIXME: use constants from tp-python once the spec is undrafted
57CHANNEL_TYPE_FILE_TRANSFER = \
58        'org.freedesktop.Telepathy.Channel.Type.FileTransfer'
59
60new_file_transfer = dispatch.Signal()
61
62
63# TODO Move to use splice_async() in Sugar 0.88
64class StreamSplicer(GObject.GObject):
65    _CHUNK_SIZE = 10240  # 10K
66    __gsignals__ = {
67        'finished': (GObject.SignalFlags.RUN_FIRST,
68                     None,
69                     ([])),
70    }
71
72    def __init__(self, input_stream, output_stream):
73        GObject.GObject.__init__(self)
74
75        self._input_stream = input_stream
76        self._output_stream = output_stream
77        self._pending_buffers = []
78
79    def start(self):
80        self._input_stream.read_bytes_async(
81            self._CHUNK_SIZE, GLib.PRIORITY_LOW,
82            None, self.__read_async_cb, None)
83
84    def __read_async_cb(self, input_stream, result, user_data):
85        data = input_stream.read_bytes_finish(result)
86
87        if data is None:
88            # TODO: an error occured. Report something
89            logging.error('An error occured in the file transfer.')
90        elif data.get_size() == 0:
91            # We read the file completely
92            logging.debug('Closing input stream. Reading finished.')
93            self._input_stream.close(None)
94        else:
95            logging.debug('Data received (bytes): %s', data.get_size())
96            self._pending_buffers.append(data)
97            self._input_stream.read_bytes_async(
98                self._CHUNK_SIZE, GLib.PRIORITY_LOW,
99                None, self.__read_async_cb, None)
100        self._write_next_buffer()
101
102    def __write_async_cb(self, output_stream, result, user_data):
103        size = output_stream.write_bytes_finish(result)
104        logging.debug('Size written (bytes): %s', size)
105
106        if not self._pending_buffers and \
107                not self._output_stream.has_pending() and \
108                not self._input_stream.has_pending():
109            logging.debug('Closing output stream. Writing finished.')
110            output_stream.close(None)
111            self.emit('finished')
112        else:
113            self._write_next_buffer()
114
115    def _write_next_buffer(self):
116        if self._pending_buffers and not self._output_stream.has_pending():
117            data = self._pending_buffers.pop(0)
118            self._output_stream.write_bytes_async(
119                data, GLib.PRIORITY_LOW, None,
120                self.__write_async_cb, None)
121
122
123class BaseFileTransfer(GObject.GObject):
124
125    def __init__(self, connection):
126        GObject.GObject.__init__(self)
127        self._connection = connection
128        self._state = FT_STATE_NONE
129        self._transferred_bytes = 0
130
131        self.channel = None
132        self.buddy = None
133        self.title = None
134        self.file_size = None
135        self.description = None
136        self.mime_type = None
137        self.initial_offset = 0
138        self.reason_last_change = FT_REASON_NONE
139
140    def set_channel(self, channel):
141        self.channel = channel
142        self.channel[CHANNEL_TYPE_FILE_TRANSFER].connect_to_signal(
143                'FileTransferStateChanged', self.__state_changed_cb)
144        self.channel[CHANNEL_TYPE_FILE_TRANSFER].connect_to_signal(
145                'TransferredBytesChanged', self.__transferred_bytes_changed_cb)
146        self.channel[CHANNEL_TYPE_FILE_TRANSFER].connect_to_signal(
147                'InitialOffsetDefined', self.__initial_offset_defined_cb)
148
149        channel_properties = self.channel[dbus.PROPERTIES_IFACE]
150
151        props = channel_properties.GetAll(CHANNEL_TYPE_FILE_TRANSFER)
152        self._state = props['State']
153        self.title = props['Filename']
154        self.file_size = props['Size']
155        self.description = props['Description']
156        self.mime_type = props['ContentType']
157
158        handle = channel_properties.Get(CHANNEL, 'TargetHandle')
159        self.buddy = neighborhood.get_model().get_buddy_by_handle(handle)
160
161    def __transferred_bytes_changed_cb(self, transferred_bytes):
162        logging.debug('__transferred_bytes_changed_cb %r', transferred_bytes)
163        self.props.transferred_bytes = transferred_bytes
164
165    def _set_transferred_bytes(self, transferred_bytes):
166        self._transferred_bytes = transferred_bytes
167
168    def _get_transferred_bytes(self):
169        return self._transferred_bytes
170
171    transferred_bytes = GObject.property(type=int, default=0,
172            getter=_get_transferred_bytes, setter=_set_transferred_bytes)
173
174    def __initial_offset_defined_cb(self, offset):
175        logging.debug('__initial_offset_defined_cb %r', offset)
176        self.initial_offset = offset
177
178    def __state_changed_cb(self, state, reason):
179        logging.debug('__state_changed_cb %r %r', state, reason)
180        self.reason_last_change = reason
181        self.props.state = state
182
183    def _set_state(self, state):
184        self._state = state
185
186    def _get_state(self):
187        return self._state
188
189    state = GObject.property(type=int, getter=_get_state, setter=_set_state)
190
191    def cancel(self):
192        self.channel[CHANNEL].Close()
193
194
195class IncomingFileTransfer(BaseFileTransfer):
196    def __init__(self, connection, object_path, props):
197        BaseFileTransfer.__init__(self, connection)
198
199        channel = Channel(connection.service_name, object_path)
200        self.set_channel(channel)
201
202        self.connect('notify::state', self.__notify_state_cb)
203
204        self.destination_path = None
205        self._socket_address = None
206        self._socket = None
207        self._splicer = None
208
209    def accept(self, destination_path):
210        if os.path.exists(destination_path):
211            raise ValueError('Destination path already exists: %r' % \
212                             destination_path)
213
214        self.destination_path = destination_path
215
216        channel_ft = self.channel[CHANNEL_TYPE_FILE_TRANSFER]
217        self._socket_address = channel_ft.AcceptFile(SOCKET_ADDRESS_TYPE_UNIX,
218                SOCKET_ACCESS_CONTROL_LOCALHOST, '', 0, byte_arrays=True)
219
220    def __notify_state_cb(self, file_transfer, pspec):
221        logging.debug('__notify_state_cb %r', self.props.state)
222        if self.props.state == FT_STATE_OPEN:
223            # Need to hold a reference to the socket so that python doesn't
224            # close the fd when it goes out of scope
225            self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
226            self._socket.connect(self._socket_address)
227            input_stream = Gio.UnixInputStream.new(self._socket.fileno(), True)
228
229            destination_file = Gio.File.new_for_path(self.destination_path)
230            if self.initial_offset == 0:
231                output_stream = destination_file.create(
232                    Gio.FileCreateFlags.PRIVATE, None)
233            else:
234                output_stream = destination_file.append_to()
235
236            # TODO: Use splice_async when it gets implemented
237            self._splicer = StreamSplicer(input_stream, output_stream)
238            self._splicer.start()
239
240
241class OutgoingFileTransfer(BaseFileTransfer):
242    def __init__(self, buddy, file_name, title, description, mime_type):
243
244        presence_service = presenceservice.get_instance()
245        name, path = presence_service.get_preferred_connection()
246        connection = Connection(name, path,
247                                ready_handler=self.__connection_ready_cb)
248
249        BaseFileTransfer.__init__(self, connection)
250        self.connect('notify::state', self.__notify_state_cb)
251
252        self._file_name = file_name
253        self._socket_address = None
254        self._socket = None
255        self._splicer = None
256        self._output_stream = None
257
258        self.buddy = buddy
259        self.title = title
260        self.file_size = os.stat(file_name).st_size
261        self.description = description
262        self.mime_type = mime_type
263
264    def __connection_ready_cb(self, connection):
265        requests = connection[CONNECTION_INTERFACE_REQUESTS]
266        object_path, properties_ = requests.CreateChannel({
267            CHANNEL + '.ChannelType': CHANNEL_TYPE_FILE_TRANSFER,
268            CHANNEL + '.TargetHandleType': CONNECTION_HANDLE_TYPE_CONTACT,
269            CHANNEL + '.TargetHandle': self.buddy.handle,
270            CHANNEL_TYPE_FILE_TRANSFER + '.ContentType': self.mime_type,
271            CHANNEL_TYPE_FILE_TRANSFER + '.Filename': self.title,
272            CHANNEL_TYPE_FILE_TRANSFER + '.Size': self.file_size,
273            CHANNEL_TYPE_FILE_TRANSFER + '.Description': self.description,
274            CHANNEL_TYPE_FILE_TRANSFER + '.InitialOffset': 0})
275
276        self.set_channel(Channel(connection.service_name, object_path))
277
278        channel_file_transfer = self.channel[CHANNEL_TYPE_FILE_TRANSFER]
279        self._socket_address = channel_file_transfer.ProvideFile(
280                SOCKET_ADDRESS_TYPE_UNIX, SOCKET_ACCESS_CONTROL_LOCALHOST, '',
281                byte_arrays=True)
282
283    def __notify_state_cb(self, file_transfer, pspec):
284        logging.debug('__notify_state_cb %r', self.props.state)
285        if self.props.state == FT_STATE_OPEN:
286            # Need to hold a reference to the socket so that python doesn't
287            # closes the fd when it goes out of scope
288            self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
289            self._socket.connect(self._socket_address)
290            output_stream = Gio.UnixOutputStream.new(
291                self._socket.fileno(), True)
292
293            logging.debug('opening %s for reading', self._file_name)
294            input_stream = Gio.File.new_for_path(self._file_name).read(None)
295            if self.initial_offset > 0:
296                input_stream.skip(self.initial_offset)
297
298            # TODO: Use splice_async when it gets implemented
299            self._splicer = StreamSplicer(input_stream, output_stream)
300            self._splicer.start()
301
302    def cancel(self):
303        self.channel[CHANNEL].Close()
304
305
306def _new_channels_cb(connection, channels):
307    for object_path, props in channels:
308        if props[CHANNEL + '.ChannelType'] == CHANNEL_TYPE_FILE_TRANSFER and \
309                not props[CHANNEL + '.Requested']:
310
311            logging.debug('__new_channels_cb %r', object_path)
312
313            incoming_file_transfer = IncomingFileTransfer(connection,
314                                                          object_path, props)
315            new_file_transfer.send(None, file_transfer=incoming_file_transfer)
316
317
318def _monitor_connection(connection):
319    logging.debug('connection added %r', connection)
320    connection[CONNECTION_INTERFACE_REQUESTS].connect_to_signal('NewChannels',
321            lambda channels: _new_channels_cb(connection, channels))
322
323
324def _connection_added_cb(conn_watcher, connection):
325    _monitor_connection(connection)
326
327
328def _connection_removed_cb(conn_watcher, connection):
329    logging.debug('connection removed %r', connection)
330
331
332def init():
333    conn_watcher = connection_watcher.get_instance()
334    conn_watcher.connect('connection-added', _connection_added_cb)
335    conn_watcher.connect('connection-removed', _connection_removed_cb)
336
337    for connection in conn_watcher.get_connections():
338        _monitor_connection(connection)
339
340
341def start_transfer(buddy, file_name, title, description, mime_type):
342    outgoing_file_transfer = OutgoingFileTransfer(buddy, file_name, title,
343                                                  description, mime_type)
344    new_file_transfer.send(None, file_transfer=outgoing_file_transfer)
345
346
347def file_transfer_available():
348    conn_watcher = connection_watcher.get_instance()
349    for connection in conn_watcher.get_connections():
350
351        properties_iface = connection[dbus.PROPERTIES_IFACE]
352        properties = properties_iface.GetAll(CONNECTION_INTERFACE_REQUESTS)
353        classes = properties['RequestableChannelClasses']
354        for prop, allowed_prop in classes:
355
356            channel_type = prop.get(CHANNEL + '.ChannelType', '')
357            target_handle_type = prop.get(CHANNEL + '.TargetHandleType', '')
358
359            if len(prop) == 2 and \
360                    channel_type == CHANNEL_TYPE_FILE_TRANSFER and \
361                    target_handle_type == CONNECTION_HANDLE_TYPE_CONTACT:
362                return True
363
364        return False
365
366
367if __name__ == '__main__':
368    import tempfile
369
370    test_file_name = '/home/humitos/test.py'
371    test_temp_file = tempfile.mkstemp()[1]
372    print test_temp_file
373    test_input_stream = Gio.File.new_for_path(test_file_name).read(None)
374    test_output_stream = Gio.File.new_for_path(test_temp_file)\
375        .append_to(Gio.FileCreateFlags.PRIVATE, None)
376
377    # TODO: Use splice_async when it gets implemented
378    splicer = StreamSplicer(test_input_stream, test_output_stream)
379    splicer.start()
380
381    loop = GObject.MainLoop()
382    loop.run()