Ignore:
Timestamp:
Dec 28, 2012, 11:16:37 AM (7 years ago)
Author:
Stefan Schwarzer <sschwarzer@…>
Branch:
default
Message:
Extracted `MockUnixSession` corresponding to `MockMSFormatSession`.

Also, a few tests had to be adapted as a consequence.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • test/mock_ftplib.py

    r1139 r1140  
    1 # Copyright (C) 2003-2009, Stefan Schwarzer <sschwarzer@sschwarzer.net>
     1# Copyright (C) 2003-2012, Stefan Schwarzer <sschwarzer@sschwarzer.net>
    22# See the file LICENSE for licensing terms.
    33
     
    8383    current_dir = '/home/sschwarzer'
    8484
    85     # Used by `MockSession.dir`
     85    # Used by `MockSession.dir`. This is a mapping from absolute path
     86    # to the multi-line string that would show up in an FTP
     87    # command-line client for this directory.
     88    dir_contents = {}
     89
     90    # File content to be used (indirectly) with `transfercmd`.
     91    mock_file_content = ''
     92
     93    def __init__(self, host='', user='', password=''):
     94        self.closed = 0
     95        # Count successful `transfercmd` invocations to ensure that
     96        # each has a corresponding `voidresp`.
     97        self._transfercmds = 0
     98        # Dummy, only for getting/setting timeout in `_FTPFile.close`
     99        self.sock = MockSocket("", "")
     100
     101    def voidcmd(self, cmd):
     102        if DEBUG:
     103            print cmd
     104        if cmd == 'STAT':
     105            return 'MockSession server awaiting your commands ;-)'
     106        elif cmd.startswith('TYPE '):
     107            return
     108        elif cmd.startswith('SITE CHMOD'):
     109            raise ftplib.error_perm("502 command not implemented")
     110        else:
     111            raise ftplib.error_perm
     112
     113    def pwd(self):
     114        return self.current_dir
     115
     116    def _remove_trailing_slash(self, path):
     117        if path != '/' and path.endswith('/'):
     118            path = path[:-1]
     119        return path
     120
     121    def _transform_path(self, path):
     122        return posixpath.normpath(posixpath.join(self.pwd(), path))
     123
     124    def cwd(self, path):
     125        self.current_dir = self._transform_path(path)
     126
     127    def dir(self, *args):
     128        """Provide a callback function for processing each line of
     129        a directory listing. Return nothing.
     130        """
     131        if callable(args[-1]):
     132            callback = args[-1]
     133            args = args[:-1]
     134        else:
     135            callback = None
     136        # Everything before the path argument are options.
     137        path = args[-1]
     138        if DEBUG:
     139            print 'dir: %s' % path
     140        path = self._transform_path(path)
     141        if not self.dir_contents.has_key(path):
     142            raise ftplib.error_perm
     143        dir_lines = self.dir_contents[path].split('\n')
     144        for line in dir_lines:
     145            if callback is None:
     146                print line
     147            else:
     148                callback(line)
     149
     150    def voidresp(self):
     151        assert self._transfercmds == 1
     152        self._transfercmds = self._transfercmds - 1
     153        return '2xx'
     154
     155    def transfercmd(self, cmd):
     156        """
     157        Return a `MockSocket` object whose `makefile` method will
     158        return a mock file object.
     159        """
     160        if DEBUG:
     161            print cmd
     162        # Fail if attempting to read from/write to a directory
     163        cmd, path = cmd.split()
     164        path = self._remove_trailing_slash(path)
     165        if self.dir_contents.has_key(path):
     166            raise ftplib.error_perm
     167        # Fail if path isn't available (this name is hard-coded here
     168        # and has to be used for the corresponding tests).
     169        if (cmd, path) == ('RETR', 'notthere'):
     170            raise ftplib.error_perm
     171        assert self._transfercmds == 0
     172        self._transfercmds = self._transfercmds + 1
     173        return MockSocket(path, self.mock_file_content)
     174
     175    def close(self):
     176        if not self.closed:
     177            self.closed = 1
     178            assert self._transfercmds == 0
     179
     180
     181class MockUnixSession(MockSession):
     182
    86183    dir_contents = {
    87184      '/': """\
     
    121218    }
    122219
    123     # File content to be used (indirectly) with `transfercmd`.
    124     mock_file_content = ''
    125 
    126     def __init__(self, host='', user='', password=''):
    127         self.closed = 0
    128         # Count successful `transfercmd` invocations to ensure that
    129         # each has a corresponding `voidresp`.
    130         self._transfercmds = 0
    131         # Dummy, only for getting/setting timeout in `_FTPFile.close`
    132         self.sock = MockSocket("", "")
    133 
    134     def voidcmd(self, cmd):
    135         if DEBUG:
    136             print cmd
    137         if cmd == 'STAT':
    138             return 'MockSession server awaiting your commands ;-)'
    139         elif cmd.startswith('TYPE '):
    140             return
    141         elif cmd.startswith('SITE CHMOD'):
    142             raise ftplib.error_perm("502 command not implemented")
    143         else:
    144             raise ftplib.error_perm
    145 
    146     def pwd(self):
    147         return self.current_dir
    148 
    149     def _remove_trailing_slash(self, path):
    150         if path != '/' and path.endswith('/'):
    151             path = path[:-1]
    152         return path
    153 
    154     def _transform_path(self, path):
    155         return posixpath.normpath(posixpath.join(self.pwd(), path))
    156 
    157     def cwd(self, path):
    158         self.current_dir = self._transform_path(path)
    159 
    160     def dir(self, *args):
    161         """Provide a callback function for processing each line of
    162         a directory listing. Return nothing.
    163         """
    164         if callable(args[-1]):
    165             callback = args[-1]
    166             args = args[:-1]
    167         else:
    168             callback = None
    169         # Everything before the path argument are options.
    170         path = args[-1]
    171         if DEBUG:
    172             print 'dir: %s' % path
    173         path = self._transform_path(path)
    174         if not self.dir_contents.has_key(path):
    175             raise ftplib.error_perm
    176         dir_lines = self.dir_contents[path].split('\n')
    177         for line in dir_lines:
    178             if callback is None:
    179                 print line
    180             else:
    181                 callback(line)
    182 
    183     def voidresp(self):
    184         assert self._transfercmds == 1
    185         self._transfercmds = self._transfercmds - 1
    186         return '2xx'
    187 
    188     def transfercmd(self, cmd):
    189         """
    190         Return a `MockSocket` object whose `makefile` method will
    191         return a mock file object.
    192         """
    193         if DEBUG:
    194             print cmd
    195         # Fail if attempting to read from/write to a directory
    196         cmd, path = cmd.split()
    197         path = self._remove_trailing_slash(path)
    198         if self.dir_contents.has_key(path):
    199             raise ftplib.error_perm
    200         # Fail if path isn't available (this name is hard-coded here
    201         # and has to be used for the corresponding tests).
    202         if (cmd, path) == ('RETR', 'notthere'):
    203             raise ftplib.error_perm
    204         assert self._transfercmds == 0
    205         self._transfercmds = self._transfercmds + 1
    206         return MockSocket(path, self.mock_file_content)
    207 
    208     def close(self):
    209         if not self.closed:
    210             self.closed = 1
    211             assert self._transfercmds == 0
    212 
    213220
    214221class MockMSFormatSession(MockSession):
Note: See TracChangeset for help on using the changeset viewer.