SaltyCrane Blog — Notes on JavaScript and web development

How to prevent nose (unittest) from using the docstring when verbosity >= 2

Some of our Python unit tests have docstrings. I find it annoying that, when using a verbosity level >= 2, nose prints the docstring instead of the class name and method name. Here's a hack to prevent it from doing that: Add a shortDescription() method to the test case class that returns None.

Here is an example of normal behavior:

import unittest

class MyTestCase(unittest.TestCase):
    def test_with_docstring(self):
        """Test that something does something
        """

    def test_without_docstring(self):
        pass
$ nosetests --verbosity=2 tmp.py
Test that something does something ... ok
test_without_docstring (tmp.MyTestCase) ... ok

Here is an example with the hack to prevent printing the docstring:

import unittest

class MyTestCase(unittest.TestCase):
    def shortDescription(self):
        return None

    def test_with_docstring(self):
        """Test that something does something
        """

    def test_without_docstring(self):
        pass
$ nosetests --verbosity=2 tmp.py
test_with_docstring (tmp.MyTestCase) ... ok
test_without_docstring (tmp.MyTestCase) ... ok

Hack to share & sync Google contacts between Android phones

I want to share and sync (in real time) Google (Gmail) contacts with my wife on our Android 2.3.6 Gingerbread phones. Google does not make this easy to do. Here's the best solution I could come up with (ref whitenack on androidcentral). (Note: these are not our real email addresses.)

  • This contact list resides only on the [email protected] account.
  • Contacts are removed from the "My Contacts" group and instead stored in groups called "Angela" and/or "Eliot". For shared contacts, the contact is in both groups. (Contact groups are like tags. A contact can be in multiple groups at the same time.)
  • Contacts in the "Angela" group show up on Angela's phone and contacts in the "Eliot" group show up on Eliot's phone. Contacts in both groups show up in both phones.
  • On Angela's phone, add the [email protected] account and check the box for syncing Contacts and uncheck the box for syncing Contacts from the [email protected] account.
  • On Angela's phone, check the box for displaying the groups "My Contacts" and "Angela" under the [email protected] account and uncheck all the boxes for displaying contacts on the [email protected] account.
  • On Eliot's phone, check the box for displaying the groups "My Contacts" and "Eliot" under the [email protected] account
  • On both phones, set the account used for creating new contacts to [email protected]: Contacts -> More -> Settings -> Contact storage -> Select the [email protected] account
  • When a *new* contact is added on either of the phones, it will be added to the "My Contacts" group on the [email protected] account. These contacts later need to be moved to the "Angela" and/or "Eliot" groups from the browser while signed in to the [email protected] account.
  • The [email protected] account will not be able to view, add, or edit contacts from the browser (Gmail).

We are able to share and sync contacts in real time, however there are annoyances. The main problem is that contact list lives under one account, so it is not available to the secondary user (my wife) when she is using Gmail or wants to manage contacts in the browser. A second minor annoyance is that our Android phones don't allow us to assign a contact to a group, so all new contacts added from our phones will be added to the generic "My Contacts" group and need to be categorized later from the browser.

I also tried the free Google Apps because it has Contact sharing. However, I could not figure out how to get shared contacts to show up in our phones.

Will upgrading to Android 4.0 ICS help?

Test coverage with nose and coverage.py

It's fun to use nose + coverage.py to show my progress as I write tests. Seeing the bar next to my code change from red to green makes me happy. 100% test coverage does not mean tests are complete. For example, a boolean OR'ed conditional expression may not test all conditions even though the line is marked as covered. Other limitations are discussed here: Flaws in coverage measurement. However, good test coverage is at least a step towards having a good test suite.

Install nose and coverage.py

Activate your virtualenv and pip install nose and coverage.

$ pip install nose 
$ pip install coverage 

Run it

Here is the command line I use to run the tests. --with-coverage enables the nose-coverage plugin to check test coverage. --cover-erase erases coverage test results from a previous run. --cover-package specifies which Python package to analyze. Specifiy the package as you would using an import (e.g. dp.blueprints.info.views). If --cover-package is not specified, it will analyze everything. --cover-html enables pretty HTML coverage reports. This example is for the flask-encryptedsession tests.

$ nosetests --with-coverage --cover-erase --cover-package=flask_encryptedsession --cover-html
..........
Name                                      Stmts   Miss  Cover   Missing
-----------------------------------------------------------------------
flask_encryptedsession                        0      0   100%   
flask_encryptedsession.encryptedcookie       41      1    98%   176
flask_encryptedsession.encryptedsession      35      1    97%   75
-----------------------------------------------------------------------
TOTAL                                        76      2    97%   
----------------------------------------------------------------------
Ran 10 tests in 0.188s

OK

Display the HTML report

$ firefox cover/index.html 

Get branch coverage

Branch coverage is useful for checking "if" statements without an explicit "else" in the code. I had to install the development version of nose to use this feature: As of version 1.2.0, this feature is available.

$ pip install https://github.com/nose-devs/nose/tarball/master 
$ nosetests --cover-branches --with-coverage --cover-erase --cover-package=flask_encryptedsession --cover-html 
..........
Name                                      Stmts   Miss Branch BrPart  Cover   Missing
-------------------------------------------------------------------------------------
flask_encryptedsession                        0      0      0      0   100%   
flask_encryptedsession.encryptedcookie       41      1     12      1    96%   176
flask_encryptedsession.encryptedsession      35      1      4      1    95%   75
-------------------------------------------------------------------------------------
TOTAL                                        76      2     16      2    96%   
----------------------------------------------------------------------
Ran 10 tests in 0.234s

OK

Pycon 2012 talks that I saw that I enjoyed

The Pycon 2012 videos are up at pyvideo.org. Here are some of the talks I enjoyed that I saw. I know I probably missed some great talks so I will try to watch more online. Let me know if there are some that I should not miss.

Favorite talk of the conference

Other great talks (in chronological order)

A unique Python redis-based queue with delay

This is a simple Redis-based queue. Two features that I needed were uniqueness (i.e. if an item exists in the queue already, it won't be added again) and a delay, like beanstalkd, where an item must wait a specified time before it can be popped from the queue. There are a number of other Redis-based queues that have many more features but I didn't see one that had these two features together. This 50-line class works for my needs. It may or may not work for you. Feel free to copy this and build on it.

Note: I wrote this in May 2010. I ended up using this solution after trying out beanstalkd and Gearman.

Install

Install on Ubuntu 10.10 Maverick

  • Install the redis server
    $ sudo apt-get install redis-server 
  • Install the python redis client
    $ pip install redis 
  • Default conf file: /etc/redis/redis.conf
    Default log file: /var/log/redis/redis-server.log
    Default db dir: /var/lib/redis
    Stop redis server: sudo /etc/init.d/redis-server stop
    Start redis server: sudo /etc/init.d/redis-server start

Redis commands used

The queue is based on the redis sorted set data type and uses the following commands:

  • ZADD - Add members to a sorted set, or update its score if it already exists
  • ZRANGEBYSCORE - Return a range of members in a sorted set, by score
  • ZREM - Remove one or more members from a sorted set

Code

import time
import redis


REDIS_ADDRESS = '127.0.0.1'


class UniqueMessageQueueWithDelay(object):
    """A message queue based on the Redis sorted set data type. Duplicate items
    in the queue are not allowed. When a duplicate item is added to the queue,
    the new item is added, and the old duplicate item is removed. A delay may be
    specified when adding items to the queue. Items will only be popped after
    the delay has passed. Pop() is non-blocking, so polling must be used. The
    name of the queue == the Redis key for the sorted set.
    """
    def __init__(self, name):
        self.name = name
        self.redis = redis.Redis(REDIS_ADDRESS)

    def add(self, data, delay=0):
        """Add an item to the queue. delay is in seconds.
        """
        score = time.time() + delay
        self.redis.zadd(self.name, data, score)
        debug('Added %.1f, %s' % (score, data))

    def pop(self):
        """Pop one item from the front of the queue. Items are popped only if
        the delay specified in the add() has passed. Return False if no items
        are available.
        """
        min_score = 0
        max_score = time.time()
        result = self.redis.zrangebyscore(
            self.name, min_score, max_score, start=0, num=1, withscores=False)
        if result == None:
            return False
        if len(result) == 1:
            debug('Popped %s' % result[0])
            return result[0]
        else:
            return False

    def remove(self, data):
        return self.redis.zrem(self.name, data)


def debug(msg):
    print msg


def test_queue():
    u = UniqueMessageQueueWithDelay('myqueue')

    # add items to the queue
    for i in [0, 1, 2, 3, 4, 0, 1]:
        data = 'Item %d' % i
        delay = 5
        u.add(data, delay)
        time.sleep(0.1)

    # get items from the queue
    while True:
        print
        result = u.pop()
        print result
        if result != False:
            u.remove(result)
        time.sleep(1)


if __name__ == '__main__':
    test_queue()

Results:

Added 1320773851.8, Item 0
Added 1320773851.9, Item 1
Added 1320773852.0, Item 2
Added 1320773852.1, Item 3
Added 1320773852.2, Item 4
Added 1320773852.3, Item 0
Added 1320773852.4, Item 1

False

False

False

False

False

Popped Item 2
Item 2

Popped Item 3
Item 3

Popped Item 4
Item 4

Popped Item 0
Item 0

Popped Item 1
Item 1

False

False

False
^CTraceback (most recent call last):
  File "umqwdredisqueue.py", line 102, in 
    test_queue()
  File "umqwdredisqueue.py", line 98, in test_queue
    time.sleep(1)
KeyboardInterrupt

See also

Python gnupg (GPG) example

python-gnupg is a Python package for encrypting and decrypting strings or files using GNU Privacy Guard (GnuPG or GPG). GPG is an open source alternative to Pretty Good Privacy (PGP). A popular use of GPG and PGP is encrypting email. For more information, see the python-gnupg documentation. Another option for encrypting data from Python is keyczar.

Install

This installs the Ubuntu GPG package, creates a test user, and installs the Python package, python-gnupg. This was installed on Ubuntu 10.10 Maverick Meerkat.

$ sudo apt-get install gnupg 
$ sudo adduser testgpguser 
$ sudo su testgpguser 
$ cd 
$ virtualenv --no-site-packages venv 
$ source venv/bin/activate 
$ pip install python-gnupg 

Generate a key

This creates a GPG key. This also creates the gpghome directory if it does not exist. You may need to supply random hardware activity during the key generation. See the docs for more information. To generate random numbers, you can also install the rng-tools package.

$ sudo apt-get install rng-tools 
import os
import gnupg

os.system('rm -rf /home/testgpguser/gpghome')
gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
input_data = gpg.gen_key_input(
    name_email='[email protected]',
    passphrase='my passphrase')
key = gpg.gen_key(input_data)
print key
B0F4CF530036CE8CD1C064F17D32CEE72C015CD5

Export keys

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
ascii_armored_public_keys = gpg.export_keys(key)
ascii_armored_private_keys = gpg.export_keys(key, True)
with open('mykeyfile.asc', 'w') as f:
    f.write(ascii_armored_public_keys)
    f.write(ascii_armored_private_keys)
(venv)testgpguser@mymachine:~$ cat mykeyfile.asc 
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1.4.10 (GNU/Linux)

mI0ETqrVGAEEAP42Xs1vQv40MxA3/g/Le5B0VatnDYaSvAhiYfaub79HY4mjYcCD
FPDo5b54PSzyhlVsz5RL46+RE9NpQ2JdvFofWi7eVzfdmmTtNYEaiUSmzLUq73Vz
qu7P1RhOfwuAyW0otnw/Lw54MVjVZblvp3ln1Fcpleb9ZSrY1h61Y8pHABEBAAG0
REF1dG9nZW5lcmF0ZWQgS2V5IChHZW5lcmF0ZWQgYnkgZ251cGcucHkpIDx0ZXN0
Z3BndXNlckBteWRvbWFpbi5jb20+iLgEEwECACIFAk6q1RgCGy8GCwkIBwMCBhUI
AgkKCwQWAgMBAh4BAheAAAoJEH0yzucsAVzVBjwD/1KgTx1y3cpuumu1HF0GtQV0
Wn7l9OaSj98CqQ/f2emHD1l9rrjdt9jm1g7wSsWumpKs57vxz7NXwHw7mI4qZ5m0
cvg/qRc/BBMP8v2WgzRsmls97Pplaate1k3QfvDCVs6F1qiIQyELffjxBHbmWPhx
XEwhnpLcvk2l7NbNnEwA
=exDD
-----END PGP PUBLIC KEY BLOCK-----
-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v1.4.10 (GNU/Linux)

lQH+BE6q1RgBBAD+Nl7Nb0L+NDMQN/4Py3uQdFWrZw2GkrwIYmH2rm+/R2OJo2HA
gxTw6OW+eD0s8oZVbM+US+OvkRPTaUNiXbxaH1ou3lc33Zpk7TWBGolEpsy1Ku91
c6ruz9UYTn8LgMltKLZ8Py8OeDFY1WW5b6d5Z9RXKZXm/WUq2NYetWPKRwARAQAB
/gMDAq5W6uxeU2hDYDPZ1Yy+e97ppNXmdAeq1urZHmiPr4+a36nOWd6j0R/HBjG3
ELD8CqYiQ0vx8+F9rY/uwKga2bEkJsQXjvaaZtu97lzPyp2+avsaw2G+3jRAJWNL
5YG4c/XwK1cfEajM23f7zz/t6TRWG+Ve2Dzi7+obA0LuF8czSlpiTTEzLDk8QJCK
y2WmrZ+s+POWv3itVpI26o7PvTQESzwyKXdyCW2W66VnXTm4mQEL6kgyV0oO6xIl
QUVSn2XWvwFMg2iL+02zA467rsr1x6Nl8hEQJgFwJCejD2z+4C4yzEeQGFP9WUps
pbMedAjDHebhC9FzbW7yuQ3H7iTCK1mvidAFw2wTdrkH61ApzmSo/rSTSxXw7hLT
M/ONgYZtvr+CpJj+mIu1XvVDiftvMhXlwcvM8c9PB3zv+086K7kJDTnzPgYvL0H/
+V2b9X9BBfAax40MQuxZJWseaLtsxXyl/rhn8jSCFZoqtERBdXRvZ2VuZXJhdGVk
IEtleSAoR2VuZXJhdGVkIGJ5IGdudXBnLnB5KSA8dGVzdGdwZ3VzZXJAbXlkb21h
aW4uY29tPoi4BBMBAgAiBQJOqtUYAhsvBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIX
gAAKCRB9Ms7nLAFc1QY8A/9SoE8dct3KbrprtRxdBrUFdFp+5fTmko/fAqkP39np
hw9Zfa643bfY5tYO8ErFrpqSrOe78c+zV8B8O5iOKmeZtHL4P6kXPwQTD/L9loM0
bJpbPez6ZWmrXtZN0H7wwlbOhdaoiEMhC3348QR25lj4cVxMIZ6S3L5NpezWzZxM
AA==
=v9Z7
-----END PGP PRIVATE KEY BLOCK-----

Import keys

import gnupg
from pprint import pprint

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
key_data = open('mykeyfile.asc').read()
import_result = gpg.import_keys(key_data)
pprint(import_result.results)
[{'fingerprint': u'B0F4CF530036CE8CD1C064F17D32CEE72C015CD5',
  'ok': u'0',
  'text': 'Not actually changed\n'},
 {'fingerprint': u'B0F4CF530036CE8CD1C064F17D32CEE72C015CD5',
  'ok': u'16',
  'text': 'Contains private key\nNot actually changed\n'}]

List keys

import gnupg
from pprint import pprint

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
public_keys = gpg.list_keys()
private_keys = gpg.list_keys(True)
print 'public keys:'
pprint(public_keys)
print 'private keys:'
pprint(private_keys)
public keys:
[{'algo': u'1',
  'date': u'1319818520',
  'dummy': u'',
  'expires': u'',
  'fingerprint': u'B0F4CF530036CE8CD1C064F17D32CEE72C015CD5',
  'keyid': u'7D32CEE72C015CD5',
  'length': u'1024',
  'ownertrust': u'u',
  'trust': u'u',
  'type': u'pub',
  'uids': [u'Autogenerated Key (Generated by gnupg.py) ']}]
private keys:
[{'algo': u'1',
  'date': u'1319818520',
  'dummy': u'',
  'expires': u'',
  'fingerprint': u'B0F4CF530036CE8CD1C064F17D32CEE72C015CD5',
  'keyid': u'7D32CEE72C015CD5',
  'length': u'1024',
  'ownertrust': u'',
  'trust': u'',
  'type': u'sec',
  'uids': [u'Autogenerated Key (Generated by gnupg.py) ']}]

Encrypt a string

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
unencrypted_string = 'Who are you? How did you get in my house?'
encrypted_data = gpg.encrypt(unencrypted_string, '[email protected]')
encrypted_string = str(encrypted_data)
print 'ok: ', encrypted_data.ok
print 'status: ', encrypted_data.status
print 'stderr: ', encrypted_data.stderr
print 'unencrypted_string: ', unencrypted_string
print 'encrypted_string: ', encrypted_string
ok:  True
status:  encryption ok
stderr:  [GNUPG:] BEGIN_ENCRYPTION 2 9
[GNUPG:] END_ENCRYPTION

unencrypted_string:  Who are you? How did you get in my house?
encrypted_string:  -----BEGIN PGP MESSAGE-----
Version: GnuPG v1.4.10 (GNU/Linux)

hIwDFuhrAS77HYIBBACXqZ66rkGQv8yE61JddEmad3fUNvbfkhBPUI9OSaMO3PbN
Q/6SIDyi3FmhbM9icOBS7q3xddQpvFhwmrq9e3VLKnV3NSmWo+xJWosQ/GNAA/Hb
cwF1pOtR6bRHFBkqtmpTYnBo9rMpokW8lp4WxFxMda+af8TlId8HC0WcRUg4kNJi
AdV1fsd+sD/cGIp0cAltpaVuO4/uwV9lKd39VER6WigLDaeFUHjWhJbcHwTaJYHj
qmy5LRciNSjwsqeMK4zOFZyRPUqPVKwWLiE9kImMni0Nj/K54ElWujgTttZIlBqV
5+c=
=SM4r
-----END PGP MESSAGE-----

Decrypt a string

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
unencrypted_string = 'Who are you? How did you get in my house?'
encrypted_data = gpg.encrypt(unencrypted_string, '[email protected]')
encrypted_string = str(encrypted_data)
decrypted_data = gpg.decrypt(encrypted_string, passphrase='my passphrase')

print 'ok: ', decrypted_data.ok
print 'status: ', decrypted_data.status
print 'stderr: ', decrypted_data.stderr
print 'decrypted string: ', decrypted_data.data
ok:  True
status:  decryption ok
stderr:  [GNUPG:] ENC_TO 16E86B012EFB1D82 1 0
[GNUPG:] USERID_HINT 16E86B012EFB1D82 Autogenerated Key (Generated by gnupg.py) 
[GNUPG:] NEED_PASSPHRASE 16E86B012EFB1D82 16E86B012EFB1D82 1 0
[GNUPG:] GOOD_PASSPHRASE
gpg: encrypted with 1024-bit RSA key, ID 2EFB1D82, created 2011-11-02
      "Autogenerated Key (Generated by gnupg.py) "
[GNUPG:] BEGIN_DECRYPTION
[GNUPG:] PLAINTEXT 62 1320545729 
[GNUPG:] PLAINTEXT_LENGTH 41
[GNUPG:] DECRYPTION_OKAY
[GNUPG:] GOODMDC
[GNUPG:] END_DECRYPTION

decrypted string:  Who are you? How did you get in my house?

Encrypt a file

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
open('my-unencrypted.txt', 'w').write('You need to Google Venn diagram.')
with open('my-unencrypted.txt', 'rb') as f:
    status = gpg.encrypt_file(
        f, recipients=['[email protected]'],
        output='my-encrypted.txt.gpg')

print 'ok: ', status.ok
print 'status: ', status.status
print 'stderr: ', status.stderr
ok:  True
status:  encryption ok
stderr:  [GNUPG:] BEGIN_ENCRYPTION 2 9
[GNUPG:] END_ENCRYPTION
(venv)testgpguser@mymachine:~$  cat my-encrypted.txt.gpg 
-----BEGIN PGP MESSAGE-----
Version: GnuPG v1.4.10 (GNU/Linux)

hIwDfTLO5ywBXNUBBADo7trFZUD6Ir1vPRAJsoQXDiiw32N1m9/PXWCnQqX0nyzW
LfluNMfLFQRclNPVEg+o91qhS71apKvagp8DW7SCDE2SdCYk8nAS3bwAg5+GUyDs
XY2E6BQ1cLA1eK1V6D15ih6cq0laRzWuFkehH9PQ5Yp4ZZOmCbopw7dufnYPjdJb
AVGLpZRq64SuN1BUWIHbO7vqQGFq7qhGQwuegblEMm4vyr6FBW6JA/x4G/PMfImZ
1cH6KBrWGWrLCTiU/FKG9JvOm8mg8NXzd/TVjPs6rHRaKPFln37T7cLUwA==
=FSQP
-----END PGP MESSAGE-----

Decrypt a file

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
with open('my-encrypted.txt.gpg', 'rb') as f:
    status = gpg.decrypt_file(f, passphrase='my passphrase', output='my-decrypted.txt')

print 'ok: ', status.ok
print 'status: ', status.status
print 'stderr: ', status.stderr
ok:  True
status:  decryption ok
stderr:  [GNUPG:] ENC_TO 16E86B012EFB1D82 1 0
[GNUPG:] USERID_HINT 16E86B012EFB1D82 Autogenerated Key (Generated by gnupg.py) 
[GNUPG:] NEED_PASSPHRASE 16E86B012EFB1D82 16E86B012EFB1D82 1 0
[GNUPG:] GOOD_PASSPHRASE
gpg: encrypted with 1024-bit RSA key, ID 2EFB1D82, created 2011-11-02
      "Autogenerated Key (Generated by gnupg.py) "
[GNUPG:] BEGIN_DECRYPTION
[GNUPG:] PLAINTEXT 62 1320546031 
[GNUPG:] PLAINTEXT_LENGTH 32
[GNUPG:] DECRYPTION_OKAY
[GNUPG:] GOODMDC
[GNUPG:] END_DECRYPTION
(venv)testgpguser@mymachine:~$ cat my-decrypted.txt 
You need to Google Venn diagram.

Using curl over ftp took 3+ minutes for a 4 byte file w/ EPSV

It took over 3 minutes to download a 4 byte file with curl via ftp. It took less than a second with wget.

$ time curl -o testfile.txt -u myusername:mypassword ftp://ftp.myserver.com/path/to/testfile.txt 
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4    0     4    0     0      0      0 --:--:--  0:03:09 --:--:--     0

real    3m9.411s
user    0m0.000s
sys     0m0.050s
Running curl with the verbose option

Running curl with the verbose option showed it was failing in "Extended Passive Mode".

$ curl -v -o testfile.txt -u myusername:mypassword ftp://ftp.myserver.com/path/to/testfile.txt
* About to connect() to ftp.myserver.com port 21 (#0)
*   Trying 10.1.2.102...   % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0connected
* Connected to ftp.myserver.com (10.1.2.102) port 21 (#0)
< 220 Welcome! 01-srv.
> USER myusername
< 331 Please specify the password.
> PASS mypassword
< 230 Login successful.
> PWD
< 257 "/"
* Entry path is '/'
> CWD dmi
< 250 Directory successfully changed.
> EPSV
* Connect data stream passively
< 229 Entering Extended Passive Mode (|||58226|)
  0     0    0     0    0     0      0      0 --:--:--  0:03:09 --:--:--     0Connection timed out
* couldn't connect to host
* got positive EPSV response, but can't connect. Disabling EPSV
> PASV
< 227 Entering Passive Mode (10,1,2,102,190,42)
*   Trying 10.1.2.102... connected
* Connecting to 10.1.2.102 (10.1.2.102) port 48682
> TYPE I
< 200 Switching to Binary mode.
> SIZE testfile.txt
< 213 4
> RETR testfile.txt
< 150 Opening BINARY mode data connection for testfile.txt (4 bytes).
* Maxdownload = -1
* Getting file with size: 4
{ [data not shown]
* Remembering we are in dir "path/to/"
< 226 File send OK.
Disabling EPSV

I googled and found that some servers have a problem using Extended Passive Mode (EPSV). To disable Extended Passive Mode, use the --disable-epsv option. With this option, the download took 0.046 seconds.

$ time curl --disable-epsv -o testfile.txt -u myusername:mypassword ftp://ftp.myserver.com/path/to/testfile.txt
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4  100     4    0     0    112      0 --:--:-- --:--:-- --:--:--   133

real    0m0.046s
user    0m0.010s
sys     0m0.000s
Disabling EPSV with libcurl

Set CURLOPT_FTP_USE_EPSV to 0. See http://curl.haxx.se/libcurl/c/curl_easy_setopt.html.

Disabling EPSV with pycurl
import pycurl

c = pycurl.Curl()
c.setopt(c.FTP_USE_EPSV, 0)

Some more python recursion examples

I had a number of yaml files that contained passwords that needed encrypting. After parsing the yaml file with pyyaml, the data looked something like this:

EXAMPLE_DATA = {
    'jobs': [{'frequency': '* * * * *',
              'jobconfig': [{'config': [('*',
                                         {'maxspeed': 1048576,
                                          'password': 'onesecretpassword',
                                          'port': 22,
                                          'url': 'basset://basset1.domain.com/tootsiepop/123.csv',
                                          'username': 'myusername'})],
                             'hasbro': 'basset'},
                            {'config': [('*',
                                         {'field_delim': ',',
                                          'field_line': True,
                                          'no_blanks': True,
                                          'quote_char': '"'})],
                             'hasbro': 'pen'},
                            {'config': [('*',
                                         {'db_database': 'mydatabase',
                                          'db_host': 'myhost',
                                          'db_password': 'anothersecretpassword',
                                          'db_table': 'mytable',
                                          'db_user': 'myuser'})],
                             'hasbro': 'dart'}],
              'jobdesc': 'Data from tootsiepop',
              'jobname': 'tootsiepop',
              'max_records_fail': '110%',
              'min_failure_time': '1000y'}],
    'vendor': 'tootsiepop'}

Here is a recursive function that prints all the leaf nodes of my nested data structure.

def print_all_leaf_nodes(data):
    if isinstance(data, dict):
        for item in data.values():
            print_all_leaf_nodes(item)
    elif isinstance(data, list) or isinstance(data, tuple):
        for item in data:
            print_all_leaf_nodes(item)
    else:
        print data

print_all_leaf_nodes(EXAMPLE_DATA)

Results:

tootsiepop
1000y
tootsiepop
*
basset://basset1.domain.com/tootsiepop/123.csv
myusername
onesecretpassword
1048576
22
basset
*
True
"
,
True
pen
*
anothersecretpassword
mytable
myhost
mydatabase
myuser
dart
* * * * *
110%
Data from tootsiepop

Get all leaf nodes

This function returns all leaf nodes as a list instead of printing them. A wrapper function is used to create a Namespace instance to hold the results variable. This could alternatively be stored in a global (module-level) variable. See my notes on variable scope for more info about using a class as a namespace.

def get_all_leaf_nodes(data):
    class Namespace(object):
        pass
    ns = Namespace()
    ns.results = []

    def inner(data):
        if isinstance(data, dict):
            for item in data.values():
                inner(item)
        elif isinstance(data, list) or isinstance(data, tuple):
            for item in data:
                inner(item)
        else:
            ns.results.append(data)

    inner(data)
    return ns.results

from pprint import pprint
pprint(get_all_leaf_nodes(EXAMPLE_DATA))

Results:

['tootsiepop',
 '1000y',
 'tootsiepop',
 '*',
 'basset://basset1.domain.com/tootsiepop/123.csv',
 'myusername',
 'onesecretpassword',
 1048576,
 22,
 'basset',
 '*',
 True,
 '"',
 ',',
 True,
 'pen',
 '*',
 'anothersecretpassword',
 'mytable',
 'myhost',
 'mydatabase',
 'myuser',
 'dart',
 '* * * * *',
 '110%',
 'Data from tootsiepop']

Get all leaf key value pairs

This function gets all key value pairs where values are not compound data structures (i.e. dicts or lists)

def get_all_key_value_pairs_where_values_are_simple(data):
    class Namespace(object):
        pass
    ns = Namespace()
    ns.results = []

    def inner(data):
        if isinstance(data, dict):
            for k, v in data.iteritems():
                if (isinstance(v, dict) or
                    isinstance(v, list) or
                    isinstance(v, tuple)
                    ):
                    inner(v)
                else:
                    ns.results.append((k, v))
        elif isinstance(data, list) or isinstance(data, tuple):
            for item in data:
                inner(item)

    inner(data)
    return ns.results

from pprint import pprint
pprint(get_all_key_value_pairs_where_values_are_simple(EXAMPLE_DATA))

Results:

[('vendor', 'tootsiepop'),
 ('min_failure_time', '1000y'),
 ('jobname', 'tootsiepop'),
 ('url', 'basset://basset1.domain.com/tootsiepop/123.csv'),
 ('username', 'myusername'),
 ('password', 'onesecretpassword'),
 ('maxspeed', 1048576),
 ('port', 22),
 ('hasbro', 'basset'),
 ('field_line', True),
 ('quote_char', '"'),
 ('field_delim', ','),
 ('no_blanks', True),
 ('hasbro', 'pen'),
 ('db_password', 'anothersecretpassword'),
 ('db_table', 'mytable'),
 ('db_host', 'myhost'),
 ('db_database', 'mydatabase'),
 ('db_user', 'myuser'),
 ('hasbro', 'dart'),
 ('frequency', '* * * * *'),
 ('max_records_fail', '110%'),
 ('jobdesc', 'Data from tootsiepop')]

Modify values of terminal key value in a nested dict

This function modifies all the values of all dicts that are not compound data structures (i.e. dicts or lists). The modfn argument is a function that modifies the key value pair. It should accept two arguments: a key and value and it should return the modified value.

The example function, super_secure_encrypt is a function that checks if the string 'password' is in the key, and "encrypts" the value using the <sarcasm>super secure</sarcasm> ROT13 algorithm. (We are actually using the keyczar toolkit from google to do the encryption.)

def modify_all_simple_dict_values(data, modfn):
    if isinstance(data, dict):
        for k, v in data.iteritems():
            if (isinstance(v, dict) or
                isinstance(v, list) or
                isinstance(v, tuple)
                ):
                modify_all_simple_dict_values(v, modfn)
            else:
                data[k] = modfn(k, v)
    elif isinstance(data, list) or isinstance(data, tuple):
        for item in data:
            modify_all_simple_dict_values(item, modfn)

    return data


def super_secure_encrypt(key, value):
    if 'password' in key:
        value = value.encode('rot13')
    return value


from pprint import pprint
pprint(modify_all_simple_dict_values(EXAMPLE_DATA, super_secure_encrypt))

Results:

{'jobs': [{'frequency': '* * * * *',
           'jobconfig': [{'config': [('*',
                                      {'maxspeed': 1048576,
                                       'password': 'barfrpergcnffjbeq',
                                       'port': 22,
                                       'url': 'basset://basset1.domain.com/tootsiepop/123.csv',
                                       'username': 'myusername'})],
                          'hasbro': 'basset'},
                         {'config': [('*',
                                      {'field_delim': ',',
                                       'field_line': True,
                                       'no_blanks': True,
                                       'quote_char': '"'})],
                          'hasbro': 'pen'},
                         {'config': [('*',
                                      {'db_database': 'mydatabase',
                                       'db_host': 'myhost',
                                       'db_password': 'nabgurefrpergcnffjbeq',
                                       'db_table': 'mytable',
                                       'db_user': 'myuser'})],
                          'hasbro': 'dart'}],
           'jobdesc': 'Data from tootsiepop',
           'jobname': 'tootsiepop',
           'max_records_fail': '110%',
           'min_failure_time': '1000y'}],
 'vendor': 'tootsiepop'}

How to remove ^M characters from a file with Python

Use the following Python script to remove ^M (carriage return) characters from your file and replace them with newline characters only. To do this in Emacs, see my notes here.

remove_ctrl_m_chars.py:

import os
import sys
import tempfile


def main():
    filename = sys.argv[1]
    with tempfile.NamedTemporaryFile(delete=False) as fh:
        for line in open(filename):
            line = line.rstrip()
            fh.write(line + '\n')
        os.rename(filename, filename + '.bak')
        os.rename(fh.name, filename)


if __name__ == '__main__':
    main()

Run it

$ python remove_ctrl_m_chars.py myfile.txt 

Documentation

Notes on debugging ssh connection problems

  • Run the ssh client in verbose mode
    $ ssh -vvv user@host 
    
  • On the server, check auth.log for errors
    $ sudo tail -f /var/log/auth.log 
    

    On Red Hat, it's /var/log/secure

  • For more debugging info, (assuming you have control of the ssh server) run the sshd server in debug mode on another port
    $ sudo /usr/sbin/sshd -ddd -p 33333 
    
    Then specify the port, -p 33333 with the ssh client. e.g.
    $ ssh -vvv -p 33333 user@host 
    

Commands run on Ubuntu 10.04

sftp error: Received message too long 170160758

Problem was in the .bashrc. See http://www.snailbook.com/faq/sftp-corruption.auto.html