Ticket #4242: test_copying_file.py

File test_copying_file.py, 2.4 KB (added by humitos, 11 years ago)

An isolated example to test easily in a local machine using the same logic

Line 
1import tempfile
2import epdb
3import StringIO
4
5from gi.repository import GObject
6from gi.repository import Gio
7from gi.repository import GLib
8
9
10
11# Avoid "Fatal Python error: GC object already tracked"
12# http://stackoverflow.com/questions/7496629/gstreamer-appsrc-causes-random-crashes
13GObject.threads_init()
14
15
16class FileTransfer(GObject.GObject):
17    _CHUNK_SIZE = 10240
18
19    def __init__(self, input_stream, output_stream):
20        GObject.GObject.__init__(self)
21
22        self._input_stream = input_stream
23        self._output_stream = output_stream
24        self._pending_buffers = StringIO.StringIO()
25        self._buffer = []
26
27    def start(self):
28        self._input_stream.read_async(
29            self._pending_buffers, self._CHUNK_SIZE, GLib.PRIORITY_LOW,
30            None, self.__read_async_cb, 'user_data arg')
31
32    def __read_async_cb(self, input_stream, result, user_data):
33        print input_stream
34        print result
35        data = input_stream.read_finish(result)
36        print data
37
38        if not data:
39            self._input_stream.close(None)
40        else:
41            # self._pending_buffers.append(data)
42            self._input_stream.read_async(
43                self._buffer, self._CHUNK_SIZE, GLib.PRIORITY_LOW,
44                None, self.__read_async_cb, 'user_data_arg2')
45        self._write_next_buffer()
46
47    def __write_async_cb(self, output_stream, result, user_data):
48        print user_data
49
50        if not self._pending_buffers and \
51                not self._output_stream.has_pending() and \
52                not self._input_stream.has_pending():
53            output_stream.close(None)
54        else:
55            self._write_next_buffer()
56
57    def _write_next_buffer(self):
58        print self._output_stream
59
60
61        if not self._output_stream.has_pending():
62            # data = self._pending_buffers.pop(0)
63            # TODO: we pass the buffer as user_data because of
64            # http://bugzilla.gnome.org/show_bug.cgi?id=564102
65            self._output_stream.write_async(
66                self._pending_buffers, self._CHUNK_SIZE, GLib.PRIORITY_LOW, None,
67                self.__write_async_cb)
68
69
70test_file_name = '/home/humitos/test.py'
71test_input_stream = Gio.File.new_for_path(test_file_name).read(None)
72test_output_stream = Gio.File.new_for_path(
73    tempfile.mkstemp()[1]).append_to(Gio.FileCreateFlags.PRIVATE, None)
74
75splicer = FileTransfer(test_input_stream, test_output_stream)
76splicer.start()
77
78loop = GObject.MainLoop()
79loop.run()