| 1 | |
|---|
| 2 | |
|---|
| 3 | |
|---|
| 4 | |
|---|
| 5 | |
|---|
| 6 | |
|---|
| 7 | |
|---|
| 8 | |
|---|
| 9 | |
|---|
| 10 | |
|---|
| 11 | |
|---|
| 12 | |
|---|
| 13 | |
|---|
| 14 | |
|---|
| 15 | |
|---|
| 16 | |
|---|
| 17 | |
|---|
| 18 | |
|---|
| 19 | |
|---|
| 20 | |
|---|
| 21 | |
|---|
| 22 | |
|---|
| 23 | |
|---|
| 24 | |
|---|
| 25 | |
|---|
| 26 | |
|---|
| 27 | |
|---|
| 28 | |
|---|
| 29 | |
|---|
| 30 | |
|---|
| 31 | |
|---|
| 32 | |
|---|
| 33 | |
|---|
| 34 | import ftplib |
|---|
| 35 | import operator |
|---|
| 36 | import unittest |
|---|
| 37 | |
|---|
| 38 | import _mock_ftplib |
|---|
| 39 | import _test_base |
|---|
| 40 | import ftp_error |
|---|
| 41 | import ftp_file |
|---|
| 42 | |
|---|
| 43 | |
|---|
| 44 | |
|---|
| 45 | |
|---|
| 46 | |
|---|
| 47 | class ReadMockSession(_mock_ftplib.MockSession): |
|---|
| 48 | mock_file_content = 'line 1\r\nanother line\r\nyet another line' |
|---|
| 49 | |
|---|
| 50 | class AsciiReadMockSession(_mock_ftplib.MockSession): |
|---|
| 51 | mock_file_content = '\r\n'.join(map(str, range(20))) |
|---|
| 52 | |
|---|
| 53 | class InaccessibleDirSession(_mock_ftplib.MockSession): |
|---|
| 54 | _login_dir = '/inaccessible' |
|---|
| 55 | |
|---|
| 56 | def pwd(self): |
|---|
| 57 | return self._login_dir |
|---|
| 58 | |
|---|
| 59 | def cwd(self, dir): |
|---|
| 60 | if dir in (self._login_dir, self._login_dir + '/'): |
|---|
| 61 | raise ftplib.error_perm |
|---|
| 62 | else: |
|---|
| 63 | _mock_ftplib.MockSession.cwd(self, dir) |
|---|
| 64 | |
|---|
| 65 | |
|---|
| 66 | class TestFileOperations(unittest.TestCase): |
|---|
| 67 | """Test operations with file-like objects.""" |
|---|
| 68 | def test_inaccessible_dir(self): |
|---|
| 69 | """Test whether opening a file at an invalid location fails.""" |
|---|
| 70 | host = _test_base.ftp_host_factory( |
|---|
| 71 | session_factory=InaccessibleDirSession) |
|---|
| 72 | self.assertRaises(ftp_error.FTPIOError, host.file, |
|---|
| 73 | '/inaccessible/new_file', 'w') |
|---|
| 74 | |
|---|
| 75 | def test_caching(self): |
|---|
| 76 | """Test whether `_FTPFile` cache of `FTPHost` object works.""" |
|---|
| 77 | host = _test_base.ftp_host_factory() |
|---|
| 78 | self.assertEqual(len(host._children), 0) |
|---|
| 79 | path1 = 'path1' |
|---|
| 80 | path2 = 'path2' |
|---|
| 81 | |
|---|
| 82 | file1 = host.file(path1, 'w') |
|---|
| 83 | child1 = host._children[0] |
|---|
| 84 | self.assertEqual(len(host._children), 1) |
|---|
| 85 | self.failIf(child1._file.closed) |
|---|
| 86 | |
|---|
| 87 | file2 = host.file(path2, 'w') |
|---|
| 88 | child2 = host._children[1] |
|---|
| 89 | self.assertEqual(len(host._children), 2) |
|---|
| 90 | self.failIf(child2._file.closed) |
|---|
| 91 | |
|---|
| 92 | file1.close() |
|---|
| 93 | self.assertEqual(len(host._children), 2) |
|---|
| 94 | self.failUnless(child1._file.closed) |
|---|
| 95 | self.failIf(child2._file.closed) |
|---|
| 96 | |
|---|
| 97 | file1 = host.file(path1, 'w') |
|---|
| 98 | child1_1 = file1._host |
|---|
| 99 | |
|---|
| 100 | self.failUnless(child1 is child1_1) |
|---|
| 101 | self.failIf(child1._file.closed) |
|---|
| 102 | self.failIf(child2._file.closed) |
|---|
| 103 | |
|---|
| 104 | file2.close() |
|---|
| 105 | self.failUnless(child2._file.closed) |
|---|
| 106 | |
|---|
| 107 | def test_write_to_directory(self): |
|---|
| 108 | """Test whether attempting to write to a directory fails.""" |
|---|
| 109 | host = _test_base.ftp_host_factory() |
|---|
| 110 | self.assertRaises(ftp_error.FTPIOError, host.file, |
|---|
| 111 | '/home/sschwarzer', 'w') |
|---|
| 112 | |
|---|
| 113 | def test_binary_write(self): |
|---|
| 114 | """Write binary data with `write`.""" |
|---|
| 115 | host = _test_base.ftp_host_factory() |
|---|
| 116 | data = '\000a\001b\r\n\002c\003\n\004\r\005' |
|---|
| 117 | output = host.file('dummy', 'wb') |
|---|
| 118 | output.write(data) |
|---|
| 119 | output.close() |
|---|
| 120 | child_data = _mock_ftplib.content_of('dummy') |
|---|
| 121 | expected_data = data |
|---|
| 122 | self.assertEqual(child_data, expected_data) |
|---|
| 123 | |
|---|
| 124 | def test_ascii_write(self): |
|---|
| 125 | """Write ASCII text with `write`.""" |
|---|
| 126 | host = _test_base.ftp_host_factory() |
|---|
| 127 | data = ' \nline 2\nline 3' |
|---|
| 128 | output = host.file('dummy', 'w') |
|---|
| 129 | output.write(data) |
|---|
| 130 | output.close() |
|---|
| 131 | child_data = _mock_ftplib.content_of('dummy') |
|---|
| 132 | expected_data = ' \r\nline 2\r\nline 3' |
|---|
| 133 | self.assertEqual(child_data, expected_data) |
|---|
| 134 | |
|---|
| 135 | def test_ascii_writelines(self): |
|---|
| 136 | """Write ASCII text with `writelines`.""" |
|---|
| 137 | host = _test_base.ftp_host_factory() |
|---|
| 138 | data = [' \n', 'line 2\n', 'line 3'] |
|---|
| 139 | backup_data = data[:] |
|---|
| 140 | output = host.file('dummy', 'w') |
|---|
| 141 | output.writelines(data) |
|---|
| 142 | output.close() |
|---|
| 143 | child_data = _mock_ftplib.content_of('dummy') |
|---|
| 144 | expected_data = ' \r\nline 2\r\nline 3' |
|---|
| 145 | self.assertEqual(child_data, expected_data) |
|---|
| 146 | |
|---|
| 147 | self.assertEqual(data, backup_data) |
|---|
| 148 | |
|---|
| 149 | def test_ascii_read(self): |
|---|
| 150 | """Read ASCII text with plain `read`.""" |
|---|
| 151 | host = _test_base.ftp_host_factory(session_factory=ReadMockSession) |
|---|
| 152 | input_ = host.file('dummy', 'r') |
|---|
| 153 | data = input_.read(0) |
|---|
| 154 | self.assertEqual(data, '') |
|---|
| 155 | data = input_.read(3) |
|---|
| 156 | self.assertEqual(data, 'lin') |
|---|
| 157 | data = input_.read(7) |
|---|
| 158 | self.assertEqual(data, 'e 1\nano') |
|---|
| 159 | data = input_.read() |
|---|
| 160 | self.assertEqual(data, 'ther line\nyet another line') |
|---|
| 161 | data = input_.read() |
|---|
| 162 | self.assertEqual(data, '') |
|---|
| 163 | input_.close() |
|---|
| 164 | |
|---|
| 165 | |
|---|
| 166 | host = _test_base.ftp_host_factory(session_factory=AsciiReadMockSession) |
|---|
| 167 | expected_data = AsciiReadMockSession.mock_file_content.\ |
|---|
| 168 | replace('\r\n', '\n') |
|---|
| 169 | input_ = host.file('dummy', 'r') |
|---|
| 170 | data = input_.read(len(expected_data)) |
|---|
| 171 | self.assertEqual(data, expected_data) |
|---|
| 172 | |
|---|
| 173 | def test_binary_readline(self): |
|---|
| 174 | """Read binary data with `readline`.""" |
|---|
| 175 | host = _test_base.ftp_host_factory(session_factory=ReadMockSession) |
|---|
| 176 | input_ = host.file('dummy', 'rb') |
|---|
| 177 | data = input_.readline(3) |
|---|
| 178 | self.assertEqual(data, 'lin') |
|---|
| 179 | data = input_.readline(10) |
|---|
| 180 | self.assertEqual(data, 'e 1\r\n') |
|---|
| 181 | data = input_.readline(13) |
|---|
| 182 | self.assertEqual(data, 'another line\r') |
|---|
| 183 | data = input_.readline() |
|---|
| 184 | self.assertEqual(data, '\n') |
|---|
| 185 | data = input_.readline() |
|---|
| 186 | self.assertEqual(data, 'yet another line') |
|---|
| 187 | data = input_.readline() |
|---|
| 188 | self.assertEqual(data, '') |
|---|
| 189 | input_.close() |
|---|
| 190 | |
|---|
| 191 | def test_ascii_readline(self): |
|---|
| 192 | """Read ASCII text with `readline`.""" |
|---|
| 193 | host = _test_base.ftp_host_factory(session_factory=ReadMockSession) |
|---|
| 194 | input_ = host.file('dummy', 'r') |
|---|
| 195 | data = input_.readline(3) |
|---|
| 196 | self.assertEqual(data, 'lin') |
|---|
| 197 | data = input_.readline(10) |
|---|
| 198 | self.assertEqual(data, 'e 1\n') |
|---|
| 199 | data = input_.readline(13) |
|---|
| 200 | self.assertEqual(data, 'another line\n') |
|---|
| 201 | data = input_.readline() |
|---|
| 202 | self.assertEqual(data, 'yet another line') |
|---|
| 203 | data = input_.readline() |
|---|
| 204 | self.assertEqual(data, '') |
|---|
| 205 | input_.close() |
|---|
| 206 | |
|---|
| 207 | def test_ascii_readlines(self): |
|---|
| 208 | """Read ASCII text with `readlines`.""" |
|---|
| 209 | host = _test_base.ftp_host_factory(session_factory=ReadMockSession) |
|---|
| 210 | input_ = host.file('dummy', 'r') |
|---|
| 211 | data = input_.read(3) |
|---|
| 212 | self.assertEqual(data, 'lin') |
|---|
| 213 | data = input_.readlines() |
|---|
| 214 | self.assertEqual(data, ['e 1\n', 'another line\n', |
|---|
| 215 | 'yet another line']) |
|---|
| 216 | input_.close() |
|---|
| 217 | |
|---|
| 218 | def test_binary_iterator(self): |
|---|
| 219 | """Test the iterator interface of `FTPFile` objects.""" |
|---|
| 220 | host = _test_base.ftp_host_factory(session_factory=ReadMockSession) |
|---|
| 221 | input_ = host.file('dummy') |
|---|
| 222 | input_iterator = iter(input_) |
|---|
| 223 | self.assertEqual(input_iterator.next(), "line 1\n") |
|---|
| 224 | self.assertEqual(input_iterator.next(), "another line\n") |
|---|
| 225 | self.assertEqual(input_iterator.next(), "yet another line") |
|---|
| 226 | self.assertRaises(StopIteration, input_iterator.next) |
|---|
| 227 | input_.close() |
|---|
| 228 | |
|---|
| 229 | def test_ascii_iterator(self): |
|---|
| 230 | """Test the iterator interface of `FTPFile` objects.""" |
|---|
| 231 | host = _test_base.ftp_host_factory(session_factory=ReadMockSession) |
|---|
| 232 | input_ = host.file('dummy', 'rb') |
|---|
| 233 | input_iterator = iter(input_) |
|---|
| 234 | self.assertEqual(input_iterator.next(), "line 1\r\n") |
|---|
| 235 | self.assertEqual(input_iterator.next(), "another line\r\n") |
|---|
| 236 | self.assertEqual(input_iterator.next(), "yet another line") |
|---|
| 237 | self.assertRaises(StopIteration, input_iterator.next) |
|---|
| 238 | input_.close() |
|---|
| 239 | |
|---|
| 240 | def test_read_unknown_file(self): |
|---|
| 241 | """Test whether reading a file which isn't there fails.""" |
|---|
| 242 | host = _test_base.ftp_host_factory() |
|---|
| 243 | self.assertRaises(ftp_error.FTPIOError, host.file, 'notthere', 'r') |
|---|
| 244 | |
|---|
| 245 | |
|---|
| 246 | if __name__ == '__main__': |
|---|
| 247 | unittest.main() |
|---|