1 | # Copyright (C) 2008 Tomeu Vizoso |
---|
2 | # |
---|
3 | # This program is free software; you can redistribute it and/or modify |
---|
4 | # it under the terms of the GNU General Public License as published by |
---|
5 | # the Free Software Foundation; either version 2 of the License, or |
---|
6 | # (at your option) any later version. |
---|
7 | # |
---|
8 | # This program is distributed in the hope that it will be useful, |
---|
9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
---|
10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
---|
11 | # GNU General Public License for more details. |
---|
12 | # |
---|
13 | # You should have received a copy of the GNU General Public License |
---|
14 | # along with this program; if not, write to the Free Software |
---|
15 | # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
---|
16 | |
---|
17 | import os |
---|
18 | import logging |
---|
19 | import socket |
---|
20 | |
---|
21 | from gi.repository import GObject |
---|
22 | from gi.repository import Gio |
---|
23 | from gi.repository import GLib |
---|
24 | import dbus |
---|
25 | from telepathy.interfaces import CONNECTION_INTERFACE_REQUESTS, CHANNEL |
---|
26 | from telepathy.constants import CONNECTION_HANDLE_TYPE_CONTACT, \ |
---|
27 | SOCKET_ADDRESS_TYPE_UNIX, \ |
---|
28 | SOCKET_ACCESS_CONTROL_LOCALHOST |
---|
29 | from telepathy.client import Connection, Channel |
---|
30 | |
---|
31 | from sugar3.presence import presenceservice |
---|
32 | from sugar3 import dispatch |
---|
33 | |
---|
34 | from jarabe.util.telepathy import connection_watcher |
---|
35 | from jarabe.model import neighborhood |
---|
36 | |
---|
37 | # Avoid "Fatal Python error: GC object already tracked" |
---|
38 | # http://stackoverflow.com/questions/7496629/gstreamer-appsrc-causes-random-crashes |
---|
39 | GObject.threads_init() |
---|
40 | |
---|
41 | FT_STATE_NONE = 0 |
---|
42 | FT_STATE_PENDING = 1 |
---|
43 | FT_STATE_ACCEPTED = 2 |
---|
44 | FT_STATE_OPEN = 3 |
---|
45 | FT_STATE_COMPLETED = 4 |
---|
46 | FT_STATE_CANCELLED = 5 |
---|
47 | |
---|
48 | FT_REASON_NONE = 0 |
---|
49 | FT_REASON_REQUESTED = 1 |
---|
50 | FT_REASON_LOCAL_STOPPED = 2 |
---|
51 | FT_REASON_REMOTE_STOPPED = 3 |
---|
52 | FT_REASON_LOCAL_ERROR = 4 |
---|
53 | FT_REASON_LOCAL_ERROR = 5 |
---|
54 | FT_REASON_REMOTE_ERROR = 6 |
---|
55 | |
---|
56 | # FIXME: use constants from tp-python once the spec is undrafted |
---|
57 | CHANNEL_TYPE_FILE_TRANSFER = \ |
---|
58 | 'org.freedesktop.Telepathy.Channel.Type.FileTransfer' |
---|
59 | |
---|
60 | new_file_transfer = dispatch.Signal() |
---|
61 | |
---|
62 | |
---|
63 | # TODO Move to use splice_async() in Sugar 0.88 |
---|
64 | class StreamSplicer(GObject.GObject): |
---|
65 | _CHUNK_SIZE = 10240 # 10K |
---|
66 | __gsignals__ = { |
---|
67 | 'finished': (GObject.SignalFlags.RUN_FIRST, |
---|
68 | None, |
---|
69 | ([])), |
---|
70 | } |
---|
71 | |
---|
72 | def __init__(self, input_stream, output_stream): |
---|
73 | GObject.GObject.__init__(self) |
---|
74 | |
---|
75 | self._input_stream = input_stream |
---|
76 | self._output_stream = output_stream |
---|
77 | self._pending_buffers = [] |
---|
78 | |
---|
79 | def start(self): |
---|
80 | self._input_stream.read_bytes_async( |
---|
81 | self._CHUNK_SIZE, GLib.PRIORITY_LOW, |
---|
82 | None, self.__read_async_cb, None) |
---|
83 | |
---|
84 | def __read_async_cb(self, input_stream, result, user_data): |
---|
85 | data = input_stream.read_bytes_finish(result) |
---|
86 | |
---|
87 | if data is None: |
---|
88 | # TODO: an error occured. Report something |
---|
89 | logging.error('An error occured in the file transfer.') |
---|
90 | elif data.get_size() == 0: |
---|
91 | # We read the file completely |
---|
92 | logging.debug('Closing input stream. Reading finished.') |
---|
93 | self._input_stream.close(None) |
---|
94 | else: |
---|
95 | logging.debug('Data received (bytes): %s', data.get_size()) |
---|
96 | self._pending_buffers.append(data) |
---|
97 | self._input_stream.read_bytes_async( |
---|
98 | self._CHUNK_SIZE, GLib.PRIORITY_LOW, |
---|
99 | None, self.__read_async_cb, None) |
---|
100 | self._write_next_buffer() |
---|
101 | |
---|
102 | def __write_async_cb(self, output_stream, result, user_data): |
---|
103 | size = output_stream.write_bytes_finish(result) |
---|
104 | logging.debug('Size written (bytes): %s', size) |
---|
105 | |
---|
106 | if not self._pending_buffers and \ |
---|
107 | not self._output_stream.has_pending() and \ |
---|
108 | not self._input_stream.has_pending(): |
---|
109 | logging.debug('Closing output stream. Writing finished.') |
---|
110 | output_stream.close(None) |
---|
111 | self.emit('finished') |
---|
112 | else: |
---|
113 | self._write_next_buffer() |
---|
114 | |
---|
115 | def _write_next_buffer(self): |
---|
116 | if self._pending_buffers and not self._output_stream.has_pending(): |
---|
117 | data = self._pending_buffers.pop(0) |
---|
118 | self._output_stream.write_bytes_async( |
---|
119 | data, GLib.PRIORITY_LOW, None, |
---|
120 | self.__write_async_cb, None) |
---|
121 | |
---|
122 | |
---|
123 | class BaseFileTransfer(GObject.GObject): |
---|
124 | |
---|
125 | def __init__(self, connection): |
---|
126 | GObject.GObject.__init__(self) |
---|
127 | self._connection = connection |
---|
128 | self._state = FT_STATE_NONE |
---|
129 | self._transferred_bytes = 0 |
---|
130 | |
---|
131 | self.channel = None |
---|
132 | self.buddy = None |
---|
133 | self.title = None |
---|
134 | self.file_size = None |
---|
135 | self.description = None |
---|
136 | self.mime_type = None |
---|
137 | self.initial_offset = 0 |
---|
138 | self.reason_last_change = FT_REASON_NONE |
---|
139 | |
---|
140 | def set_channel(self, channel): |
---|
141 | self.channel = channel |
---|
142 | self.channel[CHANNEL_TYPE_FILE_TRANSFER].connect_to_signal( |
---|
143 | 'FileTransferStateChanged', self.__state_changed_cb) |
---|
144 | self.channel[CHANNEL_TYPE_FILE_TRANSFER].connect_to_signal( |
---|
145 | 'TransferredBytesChanged', self.__transferred_bytes_changed_cb) |
---|
146 | self.channel[CHANNEL_TYPE_FILE_TRANSFER].connect_to_signal( |
---|
147 | 'InitialOffsetDefined', self.__initial_offset_defined_cb) |
---|
148 | |
---|
149 | channel_properties = self.channel[dbus.PROPERTIES_IFACE] |
---|
150 | |
---|
151 | props = channel_properties.GetAll(CHANNEL_TYPE_FILE_TRANSFER) |
---|
152 | self._state = props['State'] |
---|
153 | self.title = props['Filename'] |
---|
154 | self.file_size = props['Size'] |
---|
155 | self.description = props['Description'] |
---|
156 | self.mime_type = props['ContentType'] |
---|
157 | |
---|
158 | handle = channel_properties.Get(CHANNEL, 'TargetHandle') |
---|
159 | self.buddy = neighborhood.get_model().get_buddy_by_handle(handle) |
---|
160 | |
---|
161 | def __transferred_bytes_changed_cb(self, transferred_bytes): |
---|
162 | logging.debug('__transferred_bytes_changed_cb %r', transferred_bytes) |
---|
163 | self.props.transferred_bytes = transferred_bytes |
---|
164 | |
---|
165 | def _set_transferred_bytes(self, transferred_bytes): |
---|
166 | self._transferred_bytes = transferred_bytes |
---|
167 | |
---|
168 | def _get_transferred_bytes(self): |
---|
169 | return self._transferred_bytes |
---|
170 | |
---|
171 | transferred_bytes = GObject.property(type=int, default=0, |
---|
172 | getter=_get_transferred_bytes, setter=_set_transferred_bytes) |
---|
173 | |
---|
174 | def __initial_offset_defined_cb(self, offset): |
---|
175 | logging.debug('__initial_offset_defined_cb %r', offset) |
---|
176 | self.initial_offset = offset |
---|
177 | |
---|
178 | def __state_changed_cb(self, state, reason): |
---|
179 | logging.debug('__state_changed_cb %r %r', state, reason) |
---|
180 | self.reason_last_change = reason |
---|
181 | self.props.state = state |
---|
182 | |
---|
183 | def _set_state(self, state): |
---|
184 | self._state = state |
---|
185 | |
---|
186 | def _get_state(self): |
---|
187 | return self._state |
---|
188 | |
---|
189 | state = GObject.property(type=int, getter=_get_state, setter=_set_state) |
---|
190 | |
---|
191 | def cancel(self): |
---|
192 | self.channel[CHANNEL].Close() |
---|
193 | |
---|
194 | |
---|
195 | class IncomingFileTransfer(BaseFileTransfer): |
---|
196 | def __init__(self, connection, object_path, props): |
---|
197 | BaseFileTransfer.__init__(self, connection) |
---|
198 | |
---|
199 | channel = Channel(connection.service_name, object_path) |
---|
200 | self.set_channel(channel) |
---|
201 | |
---|
202 | self.connect('notify::state', self.__notify_state_cb) |
---|
203 | |
---|
204 | self.destination_path = None |
---|
205 | self._socket_address = None |
---|
206 | self._socket = None |
---|
207 | self._splicer = None |
---|
208 | |
---|
209 | def accept(self, destination_path): |
---|
210 | if os.path.exists(destination_path): |
---|
211 | raise ValueError('Destination path already exists: %r' % \ |
---|
212 | destination_path) |
---|
213 | |
---|
214 | self.destination_path = destination_path |
---|
215 | |
---|
216 | channel_ft = self.channel[CHANNEL_TYPE_FILE_TRANSFER] |
---|
217 | self._socket_address = channel_ft.AcceptFile(SOCKET_ADDRESS_TYPE_UNIX, |
---|
218 | SOCKET_ACCESS_CONTROL_LOCALHOST, '', 0, byte_arrays=True) |
---|
219 | |
---|
220 | def __notify_state_cb(self, file_transfer, pspec): |
---|
221 | logging.debug('__notify_state_cb %r', self.props.state) |
---|
222 | if self.props.state == FT_STATE_OPEN: |
---|
223 | # Need to hold a reference to the socket so that python doesn't |
---|
224 | # close the fd when it goes out of scope |
---|
225 | self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
---|
226 | self._socket.connect(self._socket_address) |
---|
227 | input_stream = Gio.UnixInputStream.new(self._socket.fileno(), True) |
---|
228 | |
---|
229 | destination_file = Gio.File.new_for_path(self.destination_path) |
---|
230 | if self.initial_offset == 0: |
---|
231 | output_stream = destination_file.create( |
---|
232 | Gio.FileCreateFlags.PRIVATE, None) |
---|
233 | else: |
---|
234 | output_stream = destination_file.append_to() |
---|
235 | |
---|
236 | # TODO: Use splice_async when it gets implemented |
---|
237 | self._splicer = StreamSplicer(input_stream, output_stream) |
---|
238 | self._splicer.start() |
---|
239 | |
---|
240 | |
---|
241 | class OutgoingFileTransfer(BaseFileTransfer): |
---|
242 | def __init__(self, buddy, file_name, title, description, mime_type): |
---|
243 | |
---|
244 | presence_service = presenceservice.get_instance() |
---|
245 | name, path = presence_service.get_preferred_connection() |
---|
246 | connection = Connection(name, path, |
---|
247 | ready_handler=self.__connection_ready_cb) |
---|
248 | |
---|
249 | BaseFileTransfer.__init__(self, connection) |
---|
250 | self.connect('notify::state', self.__notify_state_cb) |
---|
251 | |
---|
252 | self._file_name = file_name |
---|
253 | self._socket_address = None |
---|
254 | self._socket = None |
---|
255 | self._splicer = None |
---|
256 | self._output_stream = None |
---|
257 | |
---|
258 | self.buddy = buddy |
---|
259 | self.title = title |
---|
260 | self.file_size = os.stat(file_name).st_size |
---|
261 | self.description = description |
---|
262 | self.mime_type = mime_type |
---|
263 | |
---|
264 | def __connection_ready_cb(self, connection): |
---|
265 | requests = connection[CONNECTION_INTERFACE_REQUESTS] |
---|
266 | object_path, properties_ = requests.CreateChannel({ |
---|
267 | CHANNEL + '.ChannelType': CHANNEL_TYPE_FILE_TRANSFER, |
---|
268 | CHANNEL + '.TargetHandleType': CONNECTION_HANDLE_TYPE_CONTACT, |
---|
269 | CHANNEL + '.TargetHandle': self.buddy.handle, |
---|
270 | CHANNEL_TYPE_FILE_TRANSFER + '.ContentType': self.mime_type, |
---|
271 | CHANNEL_TYPE_FILE_TRANSFER + '.Filename': self.title, |
---|
272 | CHANNEL_TYPE_FILE_TRANSFER + '.Size': self.file_size, |
---|
273 | CHANNEL_TYPE_FILE_TRANSFER + '.Description': self.description, |
---|
274 | CHANNEL_TYPE_FILE_TRANSFER + '.InitialOffset': 0}) |
---|
275 | |
---|
276 | self.set_channel(Channel(connection.service_name, object_path)) |
---|
277 | |
---|
278 | channel_file_transfer = self.channel[CHANNEL_TYPE_FILE_TRANSFER] |
---|
279 | self._socket_address = channel_file_transfer.ProvideFile( |
---|
280 | SOCKET_ADDRESS_TYPE_UNIX, SOCKET_ACCESS_CONTROL_LOCALHOST, '', |
---|
281 | byte_arrays=True) |
---|
282 | |
---|
283 | def __notify_state_cb(self, file_transfer, pspec): |
---|
284 | logging.debug('__notify_state_cb %r', self.props.state) |
---|
285 | if self.props.state == FT_STATE_OPEN: |
---|
286 | # Need to hold a reference to the socket so that python doesn't |
---|
287 | # closes the fd when it goes out of scope |
---|
288 | self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
---|
289 | self._socket.connect(self._socket_address) |
---|
290 | output_stream = Gio.UnixOutputStream.new( |
---|
291 | self._socket.fileno(), True) |
---|
292 | |
---|
293 | logging.debug('opening %s for reading', self._file_name) |
---|
294 | input_stream = Gio.File.new_for_path(self._file_name).read(None) |
---|
295 | if self.initial_offset > 0: |
---|
296 | input_stream.skip(self.initial_offset) |
---|
297 | |
---|
298 | # TODO: Use splice_async when it gets implemented |
---|
299 | self._splicer = StreamSplicer(input_stream, output_stream) |
---|
300 | self._splicer.start() |
---|
301 | |
---|
302 | def cancel(self): |
---|
303 | self.channel[CHANNEL].Close() |
---|
304 | |
---|
305 | |
---|
306 | def _new_channels_cb(connection, channels): |
---|
307 | for object_path, props in channels: |
---|
308 | if props[CHANNEL + '.ChannelType'] == CHANNEL_TYPE_FILE_TRANSFER and \ |
---|
309 | not props[CHANNEL + '.Requested']: |
---|
310 | |
---|
311 | logging.debug('__new_channels_cb %r', object_path) |
---|
312 | |
---|
313 | incoming_file_transfer = IncomingFileTransfer(connection, |
---|
314 | object_path, props) |
---|
315 | new_file_transfer.send(None, file_transfer=incoming_file_transfer) |
---|
316 | |
---|
317 | |
---|
318 | def _monitor_connection(connection): |
---|
319 | logging.debug('connection added %r', connection) |
---|
320 | connection[CONNECTION_INTERFACE_REQUESTS].connect_to_signal('NewChannels', |
---|
321 | lambda channels: _new_channels_cb(connection, channels)) |
---|
322 | |
---|
323 | |
---|
324 | def _connection_added_cb(conn_watcher, connection): |
---|
325 | _monitor_connection(connection) |
---|
326 | |
---|
327 | |
---|
328 | def _connection_removed_cb(conn_watcher, connection): |
---|
329 | logging.debug('connection removed %r', connection) |
---|
330 | |
---|
331 | |
---|
332 | def init(): |
---|
333 | conn_watcher = connection_watcher.get_instance() |
---|
334 | conn_watcher.connect('connection-added', _connection_added_cb) |
---|
335 | conn_watcher.connect('connection-removed', _connection_removed_cb) |
---|
336 | |
---|
337 | for connection in conn_watcher.get_connections(): |
---|
338 | _monitor_connection(connection) |
---|
339 | |
---|
340 | |
---|
341 | def start_transfer(buddy, file_name, title, description, mime_type): |
---|
342 | outgoing_file_transfer = OutgoingFileTransfer(buddy, file_name, title, |
---|
343 | description, mime_type) |
---|
344 | new_file_transfer.send(None, file_transfer=outgoing_file_transfer) |
---|
345 | |
---|
346 | |
---|
347 | def file_transfer_available(): |
---|
348 | conn_watcher = connection_watcher.get_instance() |
---|
349 | for connection in conn_watcher.get_connections(): |
---|
350 | |
---|
351 | properties_iface = connection[dbus.PROPERTIES_IFACE] |
---|
352 | properties = properties_iface.GetAll(CONNECTION_INTERFACE_REQUESTS) |
---|
353 | classes = properties['RequestableChannelClasses'] |
---|
354 | for prop, allowed_prop in classes: |
---|
355 | |
---|
356 | channel_type = prop.get(CHANNEL + '.ChannelType', '') |
---|
357 | target_handle_type = prop.get(CHANNEL + '.TargetHandleType', '') |
---|
358 | |
---|
359 | if len(prop) == 2 and \ |
---|
360 | channel_type == CHANNEL_TYPE_FILE_TRANSFER and \ |
---|
361 | target_handle_type == CONNECTION_HANDLE_TYPE_CONTACT: |
---|
362 | return True |
---|
363 | |
---|
364 | return False |
---|
365 | |
---|
366 | |
---|
367 | if __name__ == '__main__': |
---|
368 | import tempfile |
---|
369 | |
---|
370 | test_file_name = '/home/humitos/test.py' |
---|
371 | test_temp_file = tempfile.mkstemp()[1] |
---|
372 | print test_temp_file |
---|
373 | test_input_stream = Gio.File.new_for_path(test_file_name).read(None) |
---|
374 | test_output_stream = Gio.File.new_for_path(test_temp_file)\ |
---|
375 | .append_to(Gio.FileCreateFlags.PRIVATE, None) |
---|
376 | |
---|
377 | # TODO: Use splice_async when it gets implemented |
---|
378 | splicer = StreamSplicer(test_input_stream, test_output_stream) |
---|
379 | splicer.start() |
---|
380 | |
---|
381 | loop = GObject.MainLoop() |
---|
382 | loop.run() |
---|