diff --git a/src/jarabe/model/filetransfer.py b/src/jarabe/model/filetransfer.py
index 820df74..831c1e1 100644
a
|
b
|
import socket |
20 | 20 | |
21 | 21 | from gi.repository import GObject |
22 | 22 | from gi.repository import Gio |
| 23 | from gi.repository import GLib |
23 | 24 | import dbus |
24 | 25 | from telepathy.interfaces import CONNECTION_INTERFACE_REQUESTS, CHANNEL |
25 | 26 | from telepathy.constants import CONNECTION_HANDLE_TYPE_CONTACT, \ |
… |
… |
from sugar3 import dispatch |
33 | 34 | from jarabe.util.telepathy import connection_watcher |
34 | 35 | from jarabe.model import neighborhood |
35 | 36 | |
| 37 | # Avoid "Fatal Python error: GC object already tracked" |
| 38 | # http://stackoverflow.com/questions/7496629/gstreamer-appsrc-causes-random-crashes |
| 39 | GObject.threads_init() |
36 | 40 | |
37 | 41 | FT_STATE_NONE = 0 |
38 | 42 | FT_STATE_PENDING = 1 |
… |
… |
class StreamSplicer(GObject.GObject): |
73 | 77 | self._pending_buffers = [] |
74 | 78 | |
75 | 79 | def start(self): |
76 | | self._input_stream.read_async(self._CHUNK_SIZE, self.__read_async_cb, |
77 | | GObject.PRIORITY_LOW) |
| 80 | self._input_stream.read_async( |
| 81 | self._pending_buffers, self._CHUNK_SIZE, GLib.PRIORITY_LOW, |
| 82 | None, self.__read_async_cb, 'user_data arg') |
78 | 83 | |
79 | | def __read_async_cb(self, input_stream, result): |
| 84 | def __read_async_cb(self, input_stream, result, user_data): |
| 85 | print user_data |
80 | 86 | data = input_stream.read_finish(result) |
81 | 87 | |
82 | 88 | if not data: |
83 | 89 | logging.debug('closing input stream') |
84 | | self._input_stream.close() |
| 90 | self._input_stream.close(None) |
85 | 91 | else: |
86 | 92 | self._pending_buffers.append(data) |
87 | | self._input_stream.read_async(self._CHUNK_SIZE, |
88 | | self.__read_async_cb, |
89 | | GObject.PRIORITY_LOW) |
| 93 | self._input_stream.read_async( |
| 94 | self._pending_buffers, self._CHUNK_SIZE, GLib.PRIORITY_LOW, |
| 95 | None, self.__read_async_cb, 'user_data_arg2') |
90 | 96 | self._write_next_buffer() |
91 | 97 | |
92 | 98 | def __write_async_cb(self, output_stream, result, user_data): |
| 99 | print user_data |
93 | 100 | count_ = output_stream.write_finish(result) |
94 | 101 | |
95 | 102 | if not self._pending_buffers and \ |
96 | 103 | not self._output_stream.has_pending() and \ |
97 | 104 | not self._input_stream.has_pending(): |
98 | 105 | logging.debug('closing output stream') |
99 | | output_stream.close() |
| 106 | output_stream.close(None) |
100 | 107 | self.emit('finished') |
101 | 108 | else: |
102 | 109 | self._write_next_buffer() |
… |
… |
class StreamSplicer(GObject.GObject): |
106 | 113 | data = self._pending_buffers.pop(0) |
107 | 114 | # TODO: we pass the buffer as user_data because of |
108 | 115 | # http://bugzilla.gnome.org/show_bug.cgi?id=564102 |
109 | | self._output_stream.write_async(data, self.__write_async_cb, |
110 | | GObject.PRIORITY_LOW, |
111 | | user_data=data) |
| 116 | self._output_stream.write_async( |
| 117 | data, self._CHUNK_SIZE, GLib.PRIORITY_LOW, None, |
| 118 | self.__write_async_cb, 'user_data_arg3') |
112 | 119 | |
113 | 120 | |
114 | 121 | class BaseFileTransfer(GObject.GObject): |
… |
… |
class IncomingFileTransfer(BaseFileTransfer): |
215 | 222 | # close the fd when it goes out of scope |
216 | 223 | self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
217 | 224 | self._socket.connect(self._socket_address) |
218 | | input_stream = Gio.unix.InputStream(self._socket.fileno(), True) |
| 225 | input_stream = Gio.UnixInputStream.new(self._socket.fileno(), True) |
219 | 226 | |
220 | | destination_file = Gio.File(self.destination_path) |
| 227 | destination_file = Gio.File.new_for_path(self.destination_path) |
221 | 228 | if self.initial_offset == 0: |
222 | | output_stream = destination_file.create() |
| 229 | output_stream = destination_file.create( |
| 230 | Gio.FileCreateFlags.PRIVATE, None) |
223 | 231 | else: |
224 | 232 | output_stream = destination_file.append_to() |
225 | 233 | |
… |
… |
class OutgoingFileTransfer(BaseFileTransfer): |
277 | 285 | # closes the fd when it goes out of scope |
278 | 286 | self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
279 | 287 | self._socket.connect(self._socket_address) |
280 | | output_stream = Gio.unix.OutputStream(self._socket.fileno(), True) |
| 288 | output_stream = Gio.UnixOutputStream.new( |
| 289 | self._socket.fileno(), True) |
281 | 290 | |
282 | 291 | logging.debug('opening %s for reading', self._file_name) |
283 | | input_stream = Gio.File(self._file_name).read() |
| 292 | input_stream = Gio.File.new_for_path(self._file_name).read(None) |
284 | 293 | if self.initial_offset > 0: |
285 | 294 | input_stream.skip(self.initial_offset) |
286 | 295 | |
… |
… |
def file_transfer_available(): |
356 | 365 | if __name__ == '__main__': |
357 | 366 | import tempfile |
358 | 367 | |
359 | | test_file_name = '/home/tomeu/isos/Soas2-200904031934.iso' |
360 | | test_input_stream = Gio.File(test_file_name).read() |
361 | | test_output_stream = Gio.File(tempfile.mkstemp()[1]).append_to() |
| 368 | test_file_name = '/home/humitos/test.py' |
| 369 | test_input_stream = Gio.File.new_for_path(test_file_name).read(None) |
| 370 | test_output_stream = Gio.File.new_for_path( |
| 371 | tempfile.mkstemp()[1]).append_to(Gio.FileCreateFlags.PRIVATE, None) |
362 | 372 | |
363 | 373 | # TODO: Use splice_async when it gets implemented |
364 | 374 | splicer = StreamSplicer(test_input_stream, test_output_stream) |