Merge remote-tracking branch 'public/bug11608'

This commit is contained in:
Nick Mathewson 2014-04-28 15:52:57 -04:00
commit cdc64f020c
5 changed files with 131 additions and 81 deletions

5
changes/bug11608 Normal file
View File

@ -0,0 +1,5 @@
o Minor bugfixes (testing):
- The Python parts of the test scripts now work on Python 3 as well
as Python 2, so systems where '/usr/bin/python' is Python 3 will
no longer have the tests break. Fixes bug 11608; bugfix on
0.2.5.2-alpha.

View File

@ -35,8 +35,8 @@ LINES = sys.stdin.readlines()
for I in range(len(LINES)):
if matches(LINES[I:], FUNCNAMES):
print "OK"
print("OK")
break
else:
print "BAD"
print("BAD")

View File

@ -39,13 +39,14 @@ except ImportError:
import hashlib
import hmac
import subprocess
import sys
# **********************************************************************
# Helpers and constants
def HMAC(key,msg):
"Return the HMAC-SHA256 of 'msg' using the key 'key'."
H = hmac.new(key, "", hashlib.sha256)
H = hmac.new(key, b"", hashlib.sha256)
H.update(msg)
return H.digest()
@ -67,10 +68,10 @@ G_LENGTH = 32
H_LENGTH = 32
PROTOID = b"ntor-curve25519-sha256-1"
M_EXPAND = PROTOID + ":key_expand"
T_MAC = PROTOID + ":mac"
T_KEY = PROTOID + ":key_extract"
T_VERIFY = PROTOID + ":verify"
M_EXPAND = PROTOID + b":key_expand"
T_MAC = PROTOID + b":mac"
T_KEY = PROTOID + b":key_extract"
T_VERIFY = PROTOID + b":verify"
def H_mac(msg): return H(msg, tweak=T_MAC)
def H_verify(msg): return H(msg, tweak=T_VERIFY)
@ -91,7 +92,14 @@ class PrivateKey(curve25519mod.Private):
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def kdf_rfc5869(key, salt, info, n):
if sys.version < '3':
def int2byte(i):
return chr(i)
else:
def int2byte(i):
return bytes([i])
def kdf_rfc5869(key, salt, info, n):
prk = HMAC(key=salt, msg=key)
@ -99,7 +107,7 @@ def kdf_rfc5869(key, salt, info, n):
last = b""
i = 1
while len(out) < n:
m = last + info + chr(i)
m = last + info + int2byte(i)
last = h = HMAC(key=prk, msg=m)
out += h
i = i + 1
@ -208,7 +216,7 @@ def server(seckey_b, my_node_id, message, keyBytes=72):
pubkey_Y.serialize() +
pubkey_X.serialize() +
PROTOID +
"Server")
b"Server")
msg = pubkey_Y.serialize() + H_mac(auth_input)
@ -270,7 +278,7 @@ def client_part2(seckey_x, msg, node_id, pubkey_B, keyBytes=72):
pubkey_B.serialize() +
pubkey_Y.serialize() +
pubkey_X.serialize() + PROTOID +
"Server")
b"Server")
my_auth = H_mac(auth_input)
@ -284,7 +292,7 @@ def client_part2(seckey_x, msg, node_id, pubkey_B, keyBytes=72):
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def demo(node_id="iToldYouAboutStairs.", server_key=PrivateKey()):
def demo(node_id=b"iToldYouAboutStairs.", server_key=PrivateKey()):
"""
Try to handshake with ourself.
"""
@ -294,7 +302,7 @@ def demo(node_id="iToldYouAboutStairs.", server_key=PrivateKey()):
assert len(skeys) == 72
assert len(ckeys) == 72
assert skeys == ckeys
print "OK"
print("OK")
# ======================================================================
def timing():
@ -304,7 +312,7 @@ def timing():
import timeit
t = timeit.Timer(stmt="ntor_ref.demo(N,SK)",
setup="import ntor_ref,curve25519;N='ABCD'*5;SK=ntor_ref.PrivateKey()")
print t.timeit(number=1000)
print(t.timeit(number=1000))
# ======================================================================
@ -315,7 +323,7 @@ def kdf_vectors():
import binascii
def kdf_vec(inp):
k = kdf(inp, T_KEY, M_EXPAND, 100)
print repr(inp), "\n\""+ binascii.b2a_hex(k)+ "\""
print(repr(inp), "\n\""+ binascii.b2a_hex(k)+ "\"")
kdf_vec("")
kdf_vec("Tor")
kdf_vec("AN ALARMING ITEM TO FIND ON YOUR CREDIT-RATING STATEMENT")
@ -328,13 +336,13 @@ def test_tor():
Call the test-ntor-cl command-line program to make sure we can
interoperate with Tor's ntor program
"""
enhex=binascii.b2a_hex
enhex=lambda s: binascii.b2a_hex(s)
dehex=lambda s: binascii.a2b_hex(s.strip())
PROG = "./src/test/test-ntor-cl"
PROG = b"./src/test/test-ntor-cl"
def tor_client1(node_id, pubkey_B):
" returns (msg, state) "
p = subprocess.Popen([PROG, "client1", enhex(node_id),
p = subprocess.Popen([PROG, b"client1", enhex(node_id),
enhex(pubkey_B.serialize())],
stdout=subprocess.PIPE)
return map(dehex, p.stdout.readlines())
@ -352,7 +360,7 @@ def test_tor():
return map(dehex, p.stdout.readlines())
node_id = "thisisatornodeid$#%^"
node_id = b"thisisatornodeid$#%^"
seckey_b = PrivateKey()
pubkey_B = seckey_b.get_public()
@ -377,14 +385,13 @@ def test_tor():
assert c_keys == s_keys
assert len(c_keys) == 90
print "OK"
print("OK")
# ======================================================================
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print __doc__
print(__doc__)
elif sys.argv[1] == 'gen_kdf_vectors':
kdf_vectors()
elif sys.argv[1] == 'timing':
@ -395,4 +402,4 @@ if __name__ == '__main__':
test_tor()
else:
print __doc__
print(__doc__)

View File

@ -8,12 +8,14 @@
__all__ = ['smult_curve25519_base', 'smult_curve25519']
import sys
P = 2 ** 255 - 19
A = 486662
def expmod(b, e, m):
if e == 0: return 1
t = expmod(b, e / 2, m) ** 2 % m
t = expmod(b, e // 2, m) ** 2 % m
if e & 1: t = (t * b) % m
return t
@ -23,12 +25,14 @@ def inv(x):
# Addition and doubling formulas taken from Appendix D of "Curve25519:
# new Diffie-Hellman speed records".
def add((xn,zn), (xm,zm), (xd,zd)):
def add(n,m,d):
(xn,zn), (xm,zm), (xd,zd) = n, m, d
x = 4 * (xm * xn - zm * zn) ** 2 * zd
z = 4 * (xm * zn - zm * xn) ** 2 * xd
return (x % P, z % P)
def double((xn,zn)):
def double(n):
(xn,zn) = n
x = (xn ** 2 - zn ** 2) ** 2
z = 4 * xn * zn * (xn ** 2 + A * xn * zn + zn ** 2)
return (x % P, z % P)
@ -40,19 +44,34 @@ def curve25519(n, base):
# (m+1)th multiple of base.
def f(m):
if m == 1: return (one, two)
(pm, pm1) = f(m / 2)
(pm, pm1) = f(m // 2)
if (m & 1):
return (add(pm, pm1, one), double(pm1))
return (double(pm), add(pm, pm1, one))
((x,z), _) = f(n)
return (x * inv(z)) % P
if sys.version < '3':
def b2i(c):
return ord(c)
def i2b(i):
return chr(i)
def ba2bs(ba):
return "".join(ba)
else:
def b2i(c):
return c
def i2b(i):
return i
def ba2bs(ba):
return bytes(ba)
def unpack(s):
if len(s) != 32: raise ValueError('Invalid Curve25519 argument')
return sum(ord(s[i]) << (8 * i) for i in range(32))
return sum(b2i(s[i]) << (8 * i) for i in range(32))
def pack(n):
return ''.join([chr((n >> (8 * i)) & 255) for i in range(32)])
return ba2bs([i2b((n >> (8 * i)) & 255) for i in range(32)])
def clamp(n):
n &= ~7

View File

@ -27,6 +27,21 @@ class UnexpectedSuccess(Exception):
class UnexpectedFailure(Exception):
pass
if sys.version < '3':
def b2s(b):
return b
def s2b(s):
return s
def NamedTemporaryFile():
return tempfile.NamedTemporaryFile(delete=False)
else:
def b2s(b):
return str(b, 'ascii')
def s2b(s):
return s.encode('ascii')
def NamedTemporaryFile():
return tempfile.NamedTemporaryFile(mode="w",delete=False,encoding="ascii")
def contents(fn):
f = open(fn)
try:
@ -42,10 +57,10 @@ def run_tor(args, failure=False):
raise UnexpectedFailure()
elif not result and failure:
raise UnexpectedSuccess()
return output
return b2s(output)
def spaceify_fp(fp):
for i in xrange(0, len(fp), 4):
for i in range(0, len(fp), 4):
yield fp[i:i+4]
def lines(s):
@ -62,7 +77,7 @@ def strip_log_junk(line):
def randstring(entropy_bytes):
s = os.urandom(entropy_bytes)
return binascii.b2a_hex(s)
return b2s(binascii.b2a_hex(s))
def findLineContaining(lines, s):
for ln in lines:
@ -74,59 +89,61 @@ class CmdlineTests(unittest.TestCase):
def test_version(self):
out = run_tor(["--version"])
self.failUnless(out.startswith("Tor version "))
self.assertEquals(len(lines(out)), 1)
self.assertTrue(out.startswith("Tor version "))
self.assertEqual(len(lines(out)), 1)
def test_quiet(self):
out = run_tor(["--quiet", "--quumblebluffin", "1"], failure=True)
self.assertEquals(out, "")
self.assertEqual(out, "")
def test_help(self):
out = run_tor(["--help"], failure=False)
out2 = run_tor(["-h"], failure=False)
self.assert_(out.startswith("Copyright (c) 2001"))
self.assert_(out.endswith(
self.assertTrue(out.startswith("Copyright (c) 2001"))
self.assertTrue(out.endswith(
"tor -f <torrc> [args]\n"
"See man page for options, or https://www.torproject.org/ for documentation.\n"))
self.assert_(out == out2)
self.assertTrue(out == out2)
def test_hush(self):
torrc = tempfile.NamedTemporaryFile(delete=False)
torrc = NamedTemporaryFile()
torrc.close()
try:
out = run_tor(["--hush", "-f", torrc.name,
"--quumblebluffin", "1"], failure=True)
finally:
os.unlink(torrc.name)
self.assertEquals(len(lines(out)), 2)
self.assertEqual(len(lines(out)), 2)
ln = [ strip_log_junk(l) for l in lines(out) ]
self.assertEquals(ln[0], "Failed to parse/validate config: Unknown option 'quumblebluffin'. Failing.")
self.assertEquals(ln[1], "Reading config failed--see warnings above.")
self.assertEqual(ln[0], "Failed to parse/validate config: Unknown option 'quumblebluffin'. Failing.")
self.assertEqual(ln[1], "Reading config failed--see warnings above.")
def test_missing_argument(self):
out = run_tor(["--hush", "--hash-password"], failure=True)
self.assertEquals(len(lines(out)), 2)
self.assertEqual(len(lines(out)), 2)
ln = [ strip_log_junk(l) for l in lines(out) ]
self.assertEquals(ln[0], "Command-line option '--hash-password' with no value. Failing.")
self.assertEqual(ln[0], "Command-line option '--hash-password' with no value. Failing.")
def test_hash_password(self):
out = run_tor(["--hash-password", "woodwose"])
result = lines(out)[-1]
self.assertEquals(result[:3], "16:")
self.assertEquals(len(result), 61)
self.assertEqual(result[:3], "16:")
self.assertEqual(len(result), 61)
r = binascii.a2b_hex(result[3:])
self.assertEquals(len(r), 29)
self.assertEqual(len(r), 29)
salt, how, hashed = r[:8], r[8], r[9:]
self.assertEquals(len(hashed), 20)
self.assertEqual(len(hashed), 20)
if type(how) == type("A"):
how = ord(how)
count = (16 + (ord(how) & 15)) << ((ord(how) >> 4) + 6)
stuff = salt + "woodwose"
count = (16 + (how & 15)) << ((how >> 4) + 6)
stuff = salt + s2b("woodwose")
repetitions = count // len(stuff) + 1
inp = stuff * repetitions
inp = inp[:count]
self.assertEquals(hashlib.sha1(inp).digest(), hashed)
self.assertEqual(hashlib.sha1(inp).digest(), hashed)
def test_digests(self):
main_c = os.path.join(TOP_SRCDIR, "src", "or", "main.c")
@ -136,12 +153,14 @@ class CmdlineTests(unittest.TestCase):
out = run_tor(["--digests"])
main_line = [ l for l in lines(out) if l.endswith("/main.c") ]
digest, name = main_line[0].split()
actual = hashlib.sha1(open(main_c).read()).hexdigest()
self.assertEquals(digest, actual)
f = open(main_c, 'rb')
actual = hashlib.sha1(f.read()).hexdigest()
f.close()
self.assertEqual(digest, actual)
def test_dump_options(self):
default_torrc = tempfile.NamedTemporaryFile(delete=False)
torrc = tempfile.NamedTemporaryFile(delete=False)
default_torrc = NamedTemporaryFile()
torrc = NamedTemporaryFile()
torrc.write("SocksPort 9999")
torrc.close()
default_torrc.write("SafeLogging 0")
@ -161,27 +180,27 @@ class CmdlineTests(unittest.TestCase):
os.unlink(torrc.name)
os.unlink(default_torrc.name)
self.assertEquals(len(lines(out_sh)), 2)
self.assert_(lines(out_sh)[0].startswith("DataDirectory "))
self.assertEquals(lines(out_sh)[1:],
self.assertEqual(len(lines(out_sh)), 2)
self.assertTrue(lines(out_sh)[0].startswith("DataDirectory "))
self.assertEqual(lines(out_sh)[1:],
[ "SocksPort 9999" ])
self.assertEquals(len(lines(out_nb)), 2)
self.assertEquals(lines(out_nb),
self.assertEqual(len(lines(out_nb)), 2)
self.assertEqual(lines(out_nb),
[ "SafeLogging 0",
"SocksPort 9999" ])
out_fl = lines(out_fl)
self.assert_(len(out_fl) > 100)
self.assert_("SocksPort 9999" in out_fl)
self.assert_("SafeLogging 0" in out_fl)
self.assert_("ClientOnly 0" in out_fl)
self.assertTrue(len(out_fl) > 100)
self.assertTrue("SocksPort 9999" in out_fl)
self.assertTrue("SafeLogging 0" in out_fl)
self.assertTrue("ClientOnly 0" in out_fl)
self.assert_(out_verif.endswith("Configuration was valid\n"))
self.assertTrue(out_verif.endswith("Configuration was valid\n"))
def test_list_fingerprint(self):
tmpdir = tempfile.mkdtemp(prefix='ttca_')
torrc = tempfile.NamedTemporaryFile(delete=False)
torrc = NamedTemporaryFile()
torrc.write("ORPort 9999\n")
torrc.write("DataDirectory %s\n"%tmpdir)
torrc.write("Nickname tippi")
@ -200,21 +219,21 @@ class CmdlineTests(unittest.TestCase):
fp = fp.strip()
nn_fp = fp.split()[0]
space_fp = " ".join(spaceify_fp(fp.split()[1]))
self.assertEquals(lastlog,
self.assertEqual(lastlog,
"Your Tor server's identity key fingerprint is '%s'"%fp)
self.assertEquals(lastline, "tippi %s"%space_fp)
self.assertEquals(nn_fp, "tippi")
self.assertEqual(lastline, "tippi %s"%space_fp)
self.assertEqual(nn_fp, "tippi")
def test_list_options(self):
out = lines(run_tor(["--list-torrc-options"]))
self.assert_(len(out)>100)
self.assert_(out[0] <= 'AccountingMax')
self.assert_("UseBridges" in out)
self.assert_("SocksPort" in out)
self.assertTrue(len(out)>100)
self.assertTrue(out[0] <= 'AccountingMax')
self.assertTrue("UseBridges" in out)
self.assertTrue("SocksPort" in out)
def test_cmdline_args(self):
default_torrc = tempfile.NamedTemporaryFile(delete=False)
torrc = tempfile.NamedTemporaryFile(delete=False)
default_torrc = NamedTemporaryFile()
torrc = NamedTemporaryFile()
torrc.write("SocksPort 9999\n")
torrc.write("SocksPort 9998\n")
torrc.write("ORPort 9000\n")
@ -242,14 +261,14 @@ class CmdlineTests(unittest.TestCase):
out_1 = [ l for l in lines(out_1) if not l.startswith("DataDir") ]
out_2 = [ l for l in lines(out_2) if not l.startswith("DataDir") ]
self.assertEquals(out_1,
self.assertEqual(out_1,
["ControlPort 9500",
"Nickname eleventeen",
"ORPort 9000",
"ORPort 9001",
"SocksPort 9999",
"SocksPort 9998"])
self.assertEquals(out_2,
self.assertEqual(out_2,
["ExtORPort 9005",
"Nickname eleventeen",
"ORPort 9000",
@ -261,13 +280,13 @@ class CmdlineTests(unittest.TestCase):
fname = "nonexistent_file_"+randstring(8)
out = run_tor(["-f", fname, "--verify-config"], failure=True)
ln = [ strip_log_junk(l) for l in lines(out) ]
self.assert_("Unable to open configuration file" in ln[-2])
self.assert_("Reading config failed" in ln[-1])
self.assertTrue("Unable to open configuration file" in ln[-2])
self.assertTrue("Reading config failed" in ln[-1])
out = run_tor(["-f", fname, "--verify-config", "--ignore-missing-torrc"])
ln = [ strip_log_junk(l) for l in lines(out) ]
self.assert_(findLineContaining(ln, ", using reasonable defaults"))
self.assert_("Configuration was valid" in ln[-1])
self.assertTrue(findLineContaining(ln, ", using reasonable defaults"))
self.assertTrue("Configuration was valid" in ln[-1])
if __name__ == '__main__':
unittest.main()