You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
204 lines
6.6 KiB
Python
204 lines
6.6 KiB
Python
import pipes
|
|
import os
|
|
import string
|
|
import unittest
|
|
import shutil
|
|
from test.support import TESTFN, run_unittest, unlink, reap_children
|
|
|
|
if os.name != 'posix':
|
|
raise unittest.SkipTest('pipes module only works on posix')
|
|
|
|
TESTFN2 = TESTFN + "2"
|
|
|
|
# tr a-z A-Z is not portable, so make the ranges explicit
|
|
s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
|
|
|
|
class SimplePipeTests(unittest.TestCase):
|
|
def tearDown(self):
|
|
for f in (TESTFN, TESTFN2):
|
|
unlink(f)
|
|
|
|
def testSimplePipe1(self):
|
|
if shutil.which('tr') is None:
|
|
self.skipTest('tr is not available')
|
|
t = pipes.Template()
|
|
t.append(s_command, pipes.STDIN_STDOUT)
|
|
f = t.open(TESTFN, 'w')
|
|
f.write('hello world #1')
|
|
f.close()
|
|
with open(TESTFN) as f:
|
|
self.assertEqual(f.read(), 'HELLO WORLD #1')
|
|
|
|
def testSimplePipe2(self):
|
|
if shutil.which('tr') is None:
|
|
self.skipTest('tr is not available')
|
|
with open(TESTFN, 'w') as f:
|
|
f.write('hello world #2')
|
|
t = pipes.Template()
|
|
t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
|
|
t.copy(TESTFN, TESTFN2)
|
|
with open(TESTFN2) as f:
|
|
self.assertEqual(f.read(), 'HELLO WORLD #2')
|
|
|
|
def testSimplePipe3(self):
|
|
if shutil.which('tr') is None:
|
|
self.skipTest('tr is not available')
|
|
with open(TESTFN, 'w') as f:
|
|
f.write('hello world #2')
|
|
t = pipes.Template()
|
|
t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
|
|
f = t.open(TESTFN, 'r')
|
|
try:
|
|
self.assertEqual(f.read(), 'HELLO WORLD #2')
|
|
finally:
|
|
f.close()
|
|
|
|
def testEmptyPipeline1(self):
|
|
# copy through empty pipe
|
|
d = 'empty pipeline test COPY'
|
|
with open(TESTFN, 'w') as f:
|
|
f.write(d)
|
|
with open(TESTFN2, 'w') as f:
|
|
f.write('')
|
|
t=pipes.Template()
|
|
t.copy(TESTFN, TESTFN2)
|
|
with open(TESTFN2) as f:
|
|
self.assertEqual(f.read(), d)
|
|
|
|
def testEmptyPipeline2(self):
|
|
# read through empty pipe
|
|
d = 'empty pipeline test READ'
|
|
with open(TESTFN, 'w') as f:
|
|
f.write(d)
|
|
t=pipes.Template()
|
|
f = t.open(TESTFN, 'r')
|
|
try:
|
|
self.assertEqual(f.read(), d)
|
|
finally:
|
|
f.close()
|
|
|
|
def testEmptyPipeline3(self):
|
|
# write through empty pipe
|
|
d = 'empty pipeline test WRITE'
|
|
t = pipes.Template()
|
|
with t.open(TESTFN, 'w') as f:
|
|
f.write(d)
|
|
with open(TESTFN) as f:
|
|
self.assertEqual(f.read(), d)
|
|
|
|
def testRepr(self):
|
|
t = pipes.Template()
|
|
self.assertEqual(repr(t), "<Template instance, steps=[]>")
|
|
t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
|
|
self.assertEqual(repr(t),
|
|
"<Template instance, steps=[('tr a-z A-Z', '--')]>")
|
|
|
|
def testSetDebug(self):
|
|
t = pipes.Template()
|
|
t.debug(False)
|
|
self.assertEqual(t.debugging, False)
|
|
t.debug(True)
|
|
self.assertEqual(t.debugging, True)
|
|
|
|
def testReadOpenSink(self):
|
|
# check calling open('r') on a pipe ending with
|
|
# a sink raises ValueError
|
|
t = pipes.Template()
|
|
t.append('boguscmd', pipes.SINK)
|
|
self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
|
|
|
|
def testWriteOpenSource(self):
|
|
# check calling open('w') on a pipe ending with
|
|
# a source raises ValueError
|
|
t = pipes.Template()
|
|
t.prepend('boguscmd', pipes.SOURCE)
|
|
self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
|
|
|
|
def testBadAppendOptions(self):
|
|
t = pipes.Template()
|
|
|
|
# try a non-string command
|
|
self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
|
|
|
|
# try a type that isn't recognized
|
|
self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
|
|
|
|
# shouldn't be able to append a source
|
|
self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
|
|
|
|
# check appending two sinks
|
|
t = pipes.Template()
|
|
t.append('boguscmd', pipes.SINK)
|
|
self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
|
|
|
|
# command needing file input but with no $IN
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
|
|
pipes.FILEIN_FILEOUT)
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.append, 'boguscmd',
|
|
pipes.FILEIN_STDOUT)
|
|
|
|
# command needing file output but with no $OUT
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.append, 'boguscmd $IN',
|
|
pipes.FILEIN_FILEOUT)
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.append, 'boguscmd',
|
|
pipes.STDIN_FILEOUT)
|
|
|
|
|
|
def testBadPrependOptions(self):
|
|
t = pipes.Template()
|
|
|
|
# try a non-string command
|
|
self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
|
|
|
|
# try a type that isn't recognized
|
|
self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
|
|
|
|
# shouldn't be able to prepend a sink
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
|
|
|
|
# check prepending two sources
|
|
t = pipes.Template()
|
|
t.prepend('boguscmd', pipes.SOURCE)
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
|
|
|
|
# command needing file input but with no $IN
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
|
|
pipes.FILEIN_FILEOUT)
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd',
|
|
pipes.FILEIN_STDOUT)
|
|
|
|
# command needing file output but with no $OUT
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
|
|
pipes.FILEIN_FILEOUT)
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd',
|
|
pipes.STDIN_FILEOUT)
|
|
|
|
def testBadOpenMode(self):
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
|
|
|
|
def testClone(self):
|
|
t = pipes.Template()
|
|
t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
|
|
|
|
u = t.clone()
|
|
self.assertNotEqual(id(t), id(u))
|
|
self.assertEqual(t.steps, u.steps)
|
|
self.assertNotEqual(id(t.steps), id(u.steps))
|
|
self.assertEqual(t.debugging, u.debugging)
|
|
|
|
def test_main():
|
|
run_unittest(SimplePipeTests)
|
|
reap_children()
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|