[svn-commits] r49 - in drivers/python/trunk: . tests
crogr01 at ingres.com
crogr01 at ingres.com
Thu Jun 5 10:33:37 PDT 2008
Author: crogr01
Date: 2008-06-05 10:33:37 -0700 (Thu, 05 Jun 2008)
New Revision: 49
Added:
drivers/python/trunk/tests/
drivers/python/trunk/tests/data.txt
drivers/python/trunk/tests/dbapi20.py
drivers/python/trunk/tests/test_ingresdbi_dbapi20.py
Log:
Add DBI 2.0 Compliance tests to the repository
Added: drivers/python/trunk/tests/data.txt
===================================================================
--- drivers/python/trunk/tests/data.txt (rev 0)
+++ drivers/python/trunk/tests/data.txt 2008-06-05 17:33:37 UTC (rev 49)
@@ -0,0 +1,10 @@
+INT 1
+LONG 2222222
+FLOAT 3.0
+CHAR i am a string
+VARCHAR i am another string
+INT 1234
+LONG -1045678
+FLOAT 3.0123
+CHAR i am a string
+VARCHAR i am another string
Property changes on: drivers/python/trunk/tests/data.txt
___________________________________________________________________
Name: svn:keywords
+ Date Author Id Rev Url
Added: drivers/python/trunk/tests/dbapi20.py
===================================================================
--- drivers/python/trunk/tests/dbapi20.py (rev 0)
+++ drivers/python/trunk/tests/dbapi20.py 2008-06-05 17:33:37 UTC (rev 49)
@@ -0,0 +1,850 @@
+#!/usr/bin/env python
+''' Python DB API 2.0 driver compliance unit test suite.
+
+ This software is Public Domain and may be used without restrictions.
+
+ "Now we have booze and barflies entering the discussion, plus rumours of
+ DBAs on drugs... and I won't tell you what flashes through my mind each
+ time I read the subject line with 'Anal Compliance' in it. All around
+ this is turning out to be a thoroughly unwholesome unit test."
+
+ -- Ian Bicking
+'''
+
+__rcs_id__ = '$Id$'
+__version__ = '$Revision$'[11:-2]
+__author__ = 'Stuart Bishop <zen at shangri-la.dropbear.id.au>'
+
+import unittest
+import time
+
+# $Log: dbapi20.py,v $
+# Revision 1.10 2003/10/09 03:14:14 zenzen
+# Add test for DB API 2.0 optional extension, where database exceptions
+# are exposed as attributes on the Connection object.
+#
+# Revision 1.9 2003/08/13 01:16:36 zenzen
+# Minor tweak from Stefan Fleiter
+#
+# Revision 1.8 2003/04/10 00:13:25 zenzen
+# Changes, as per suggestions by M.-A. Lemburg
+# - Add a table prefix, to ensure namespace collisions can always be avoided
+#
+# Revision 1.7 2003/02/26 23:33:37 zenzen
+# Break out DDL into helper functions, as per request by David Rushby
+#
+# Revision 1.6 2003/02/21 03:04:33 zenzen
+# Stuff from Henrik Ekelund:
+# added test_None
+# added test_nextset & hooks
+#
+# Revision 1.5 2003/02/17 22:08:43 zenzen
+# Implement suggestions and code from Henrik Eklund - test that cursor.arraysize
+# defaults to 1 & generic cursor.callproc test added
+#
+# Revision 1.4 2003/02/15 00:16:33 zenzen
+# Changes, as per suggestions and bug reports by M.-A. Lemburg,
+# Matthew T. Kromer, Federico Di Gregorio and Daniel Dittmar
+# - Class renamed
+# - Now a subclass of TestCase, to avoid requiring the driver stub
+# to use multiple inheritance
+# - Reversed the polarity of buggy test in test_description
+# - Test exception heirarchy correctly
+# - self.populate is now self._populate(), so if a driver stub
+# overrides self.ddl1 this change propogates
+# - VARCHAR columns now have a width, which will hopefully make the
+# DDL even more portible (this will be reversed if it causes more problems)
+# - cursor.rowcount being checked after various execute and fetchXXX methods
+# - Check for fetchall and fetchmany returning empty lists after results
+# are exhausted (already checking for empty lists if select retrieved
+# nothing
+# - Fix bugs in test_setoutputsize_basic and test_setinputsizes
+#
+
+class DatabaseAPI20Test(unittest.TestCase):
+ ''' Test a database self.driver for DB API 2.0 compatibility.
+ This implementation tests Gadfly, but the TestCase
+ is structured so that other self.drivers can subclass this
+ test case to ensure compiliance with the DB-API. It is
+ expected that this TestCase may be expanded in the future
+ if ambiguities or edge conditions are discovered.
+
+ The 'Optional Extensions' are not yet being tested.
+
+ self.drivers should subclass this test, overriding setUp, tearDown,
+ self.driver, connect_args and connect_kw_args. Class specification
+ should be as follows:
+
+ import dbapi20
+ class mytest(dbapi20.DatabaseAPI20Test):
+ [...]
+
+ Don't 'import DatabaseAPI20Test from dbapi20', or you will
+ confuse the unit tester - just 'import dbapi20'.
+ '''
+
+ # The self.driver module. This should be the module where the 'connect'
+ # method is to be found
+ driver = None
+ connect_args = () # List of arguments to pass to connect
+ connect_kw_args = {} # Keyword arguments for connect
+ table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
+
+ ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
+ ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix
+ xddl1 = 'drop table %sbooze' % table_prefix
+ xddl2 = 'drop table %sbarflys' % table_prefix
+
+ lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase
+
+ # Some drivers may need to override these helpers, for example adding
+ # a 'commit' after the execute.
+ def executeDDL1(self,cursor):
+ cursor.execute(self.ddl1)
+
+ def executeDDL2(self,cursor):
+ cursor.execute(self.ddl2)
+
+ def setUp(self):
+ ''' self.drivers should override this method to perform required setup
+ if any is necessary, such as creating the database.
+ '''
+ pass
+
+ def tearDown(self):
+ ''' self.drivers should override this method to perform required cleanup
+ if any is necessary, such as deleting the test database.
+ The default drops the tables that may be created.
+ '''
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ for ddl in (self.xddl1,self.xddl2):
+ try:
+ cur.execute(ddl)
+ con.commit()
+ except self.driver.Error:
+ # Assume table didn't exist. Other tests will check if
+ # execute is busted.
+ pass
+ finally:
+ con.close()
+
+ def _connect(self):
+ try:
+ return self.driver.connect(
+ *self.connect_args,**self.connect_kw_args
+ )
+ except AttributeError:
+ self.fail("No connect method found in self.driver module")
+
+ def test_connect(self):
+ con = self._connect()
+ con.close()
+
+ def test_apilevel(self):
+ try:
+ # Must exist
+ apilevel = self.driver.apilevel
+ # Must equal 2.0
+ self.assertEqual(apilevel,'2.0')
+ except AttributeError:
+ self.fail("Driver doesn't define apilevel")
+
+ def test_threadsafety(self):
+ try:
+ # Must exist
+ threadsafety = self.driver.threadsafety
+ # Must be a valid value
+ self.failUnless(threadsafety in (0,1,2,3))
+ except AttributeError:
+ self.fail("Driver doesn't define threadsafety")
+
+ def test_paramstyle(self):
+ try:
+ # Must exist
+ paramstyle = self.driver.paramstyle
+ # Must be a valid value
+ self.failUnless(paramstyle in (
+ 'qmark','numeric','named','format','pyformat'
+ ))
+ except AttributeError:
+ self.fail("Driver doesn't define paramstyle")
+
+ def test_Exceptions(self):
+ # Make sure required exceptions exist, and are in the
+ # defined heirarchy.
+ self.failUnless(issubclass(self.driver.Warning,StandardError))
+ self.failUnless(issubclass(self.driver.Error,StandardError))
+ self.failUnless(
+ issubclass(self.driver.InterfaceError,self.driver.Error)
+ )
+ self.failUnless(
+ issubclass(self.driver.DatabaseError,self.driver.Error)
+ )
+ self.failUnless(
+ issubclass(self.driver.OperationalError,self.driver.Error)
+ )
+ self.failUnless(
+ issubclass(self.driver.IntegrityError,self.driver.Error)
+ )
+ self.failUnless(
+ issubclass(self.driver.InternalError,self.driver.Error)
+ )
+ self.failUnless(
+ issubclass(self.driver.ProgrammingError,self.driver.Error)
+ )
+ self.failUnless(
+ issubclass(self.driver.NotSupportedError,self.driver.Error)
+ )
+
+ def test_ExceptionsAsConnectionAttributes(self):
+ # OPTIONAL EXTENSION
+ # Test for the optional DB API 2.0 extension, where the exceptions
+ # are exposed as attributes on the Connection object
+ # I figure this optional extension will be implemented by any
+ # driver author who is using this test suite, so it is enabled
+ # by default.
+ con = self._connect()
+ drv = self.driver
+ self.failUnless(con.Warning is drv.Warning)
+ self.failUnless(con.Error is drv.Error)
+ self.failUnless(con.InterfaceError is drv.InterfaceError)
+ self.failUnless(con.DatabaseError is drv.DatabaseError)
+ self.failUnless(con.OperationalError is drv.OperationalError)
+ self.failUnless(con.IntegrityError is drv.IntegrityError)
+ self.failUnless(con.InternalError is drv.InternalError)
+ self.failUnless(con.ProgrammingError is drv.ProgrammingError)
+ self.failUnless(con.NotSupportedError is drv.NotSupportedError)
+
+
+ def test_commit(self):
+ con = self._connect()
+ try:
+ # Commit must work, even if it doesn't do anything
+ con.commit()
+ finally:
+ con.close()
+
+ def test_rollback(self):
+ con = self._connect()
+ # If rollback is defined, it should either work or throw
+ # the documented exception
+ if hasattr(con,'rollback'):
+ try:
+ con.rollback()
+ except self.driver.NotSupportedError:
+ pass
+
+ def test_cursor(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ finally:
+ con.close()
+
+ def test_cursor_isolation(self):
+ con = self._connect()
+ try:
+ # Make sure cursors created from the same connection have
+ # the documented transaction isolation level
+ cur1 = con.cursor()
+ cur2 = con.cursor()
+ self.executeDDL1(cur1)
+ cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+ cur2.execute("select name from %sbooze" % self.table_prefix)
+ booze = cur2.fetchall()
+ self.assertEqual(len(booze),1)
+ self.assertEqual(len(booze[0]),1)
+ self.assertEqual(booze[0][0],'Victoria Bitter')
+ finally:
+ con.close()
+
+ def test_description(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ self.assertEqual(cur.description,None,
+ 'cursor.description should be none after executing a '
+ 'statement that can return no rows (such as DDL)'
+ )
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertEqual(len(cur.description),1,
+ 'cursor.description describes too many columns'
+ )
+ self.assertEqual(len(cur.description[0]),7,
+ 'cursor.description[x] tuples must have 7 elements'
+ )
+ self.assertEqual(cur.description[0][0].lower(),'name',
+ 'cursor.description[x][0] must return column name'
+ )
+ self.assertEqual(cur.description[0][1],self.driver.STRING,
+ 'cursor.description[x][1] must return column type. Got %r'
+ % cur.description[0][1]
+ )
+
+ # Make sure self.description gets reset
+ self.executeDDL2(cur)
+ self.assertEqual(cur.description,None,
+ 'cursor.description not being set to None when executing '
+ 'no-result statements (eg. DDL)'
+ )
+ finally:
+ con.close()
+
+ def test_rowcount(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ self.assertEqual(cur.rowcount,-1,
+ 'cursor.rowcount should be -1 after executing no-result '
+ 'statements'
+ )
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+ self.failUnless(cur.rowcount in (-1,1),
+ 'cursor.rowcount should == number or rows inserted, or '
+ 'set to -1 after executing an insert statement'
+ )
+ cur.execute("select name from %sbooze" % self.table_prefix)
+ self.failUnless(cur.rowcount in (-1,1),
+ 'cursor.rowcount should == number of rows returned, or '
+ 'set to -1 after executing a select statement'
+ )
+ self.executeDDL2(cur)
+ self.assertEqual(cur.rowcount,-1,
+ 'cursor.rowcount not being reset to -1 after executing '
+ 'no-result statements'
+ )
+ finally:
+ con.close()
+
+ lower_func = 'lower'
+ def test_callproc(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if self.lower_func and hasattr(cur,'callproc'):
+ r = cur.callproc(self.lower_func,('FOO',))
+ self.assertEqual(len(r),1)
+ self.assertEqual(r[0],'FOO')
+ r = cur.fetchall()
+ self.assertEqual(len(r),1,'callproc produced no result set')
+ self.assertEqual(len(r[0]),1,
+ 'callproc produced invalid result set'
+ )
+ self.assertEqual(r[0][0],'foo',
+ 'callproc produced invalid results'
+ )
+ finally:
+ con.close()
+
+ def test_close(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ finally:
+ con.close()
+
+ # cursor.execute should raise an Error if called after connection
+ # closed
+ self.assertRaises(self.driver.Error,self.executeDDL1,cur)
+
+ # connection.commit should raise an Error if called after connection'
+ # closed.'
+ self.assertRaises(self.driver.Error,con.commit)
+
+ # connection.close should raise an Error if called more than once
+ self.assertRaises(self.driver.Error,con.close)
+
+ def test_execute(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self._paraminsert(cur)
+ finally:
+ con.close()
+
+ def _paraminsert(self,cur):
+ self.executeDDL1(cur)
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+ self.failUnless(cur.rowcount in (-1,1))
+
+ if self.driver.paramstyle == 'qmark':
+ cur.execute(
+ 'insert into %sbooze values (?)' % self.table_prefix,
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'numeric':
+ cur.execute(
+ 'insert into %sbooze values (:1)' % self.table_prefix,
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'named':
+ cur.execute(
+ 'insert into %sbooze values (:beer)' % self.table_prefix,
+ {'beer':"Cooper's"}
+ )
+ elif self.driver.paramstyle == 'format':
+ cur.execute(
+ 'insert into %sbooze values (%%s)' % self.table_prefix,
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'pyformat':
+ cur.execute(
+ 'insert into %sbooze values (%%(beer)s)' % self.table_prefix,
+ {'beer':"Cooper's"}
+ )
+ else:
+ self.fail('Invalid paramstyle')
+ self.failUnless(cur.rowcount in (-1,1))
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ res = cur.fetchall()
+ self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
+ beers = [res[0][0],res[1][0]]
+ beers.sort()
+ self.assertEqual(beers[0],"Cooper's",
+ 'cursor.fetchall retrieved incorrect data, or data inserted '
+ 'incorrectly'
+ )
+ self.assertEqual(beers[1],"Victoria Bitter",
+ 'cursor.fetchall retrieved incorrect data, or data inserted '
+ 'incorrectly'
+ )
+
+ def test_executemany(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ largs = [ ("Cooper's",) , ("Boag's",) ]
+ margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
+ if self.driver.paramstyle == 'qmark':
+ cur.executemany(
+ 'insert into %sbooze values (?)' % self.table_prefix,
+ largs
+ )
+ elif self.driver.paramstyle == 'numeric':
+ cur.executemany(
+ 'insert into %sbooze values (:1)' % self.table_prefix,
+ largs
+ )
+ elif self.driver.paramstyle == 'named':
+ cur.executemany(
+ 'insert into %sbooze values (:beer)' % self.table_prefix,
+ margs
+ )
+ elif self.driver.paramstyle == 'format':
+ cur.executemany(
+ 'insert into %sbooze values (%%s)' % self.table_prefix,
+ largs
+ )
+ elif self.driver.paramstyle == 'pyformat':
+ cur.executemany(
+ 'insert into %sbooze values (%%(beer)s)' % (
+ self.table_prefix
+ ),
+ margs
+ )
+ else:
+ self.fail('Unknown paramstyle')
+ self.failUnless(cur.rowcount in (-1,2),
+ 'insert using cursor.executemany set cursor.rowcount to '
+ 'incorrect value %r' % cur.rowcount
+ )
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ res = cur.fetchall()
+ self.assertEqual(len(res),2,
+ 'cursor.fetchall retrieved incorrect number of rows'
+ )
+ beers = [res[0][0],res[1][0]]
+ beers.sort()
+ self.assertEqual(beers[0],"Boag's",'incorrect data retrieved')
+ self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved')
+ finally:
+ con.close()
+
+ def test_fetchone(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+
+ # cursor.fetchone should raise an Error if called before
+ # executing a select-type query
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ self.executeDDL1(cur)
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertEqual(cur.fetchone(),None,
+ 'cursor.fetchone should return None if a query retrieves '
+ 'no rows'
+ )
+ self.failUnless(cur.rowcount in (-1,0))
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+ self.table_prefix
+ ))
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchone()
+ self.assertEqual(len(r),1,
+ 'cursor.fetchone should have retrieved a single row'
+ )
+ self.assertEqual(r[0],'Victoria Bitter',
+ 'cursor.fetchone retrieved incorrect data'
+ )
+ self.assertEqual(cur.fetchone(),None,
+ 'cursor.fetchone should return None if no more rows available'
+ )
+ self.failUnless(cur.rowcount in (-1,1))
+ finally:
+ con.close()
+
+ samples = [
+ 'Carlton Cold',
+ 'Carlton Draft',
+ 'Mountain Goat',
+ 'Redback',
+ 'Victoria Bitter',
+ 'XXXX'
+ ]
+
+ def _populate(self):
+ ''' Return a list of sql commands to setup the DB for the fetch
+ tests.
+ '''
+ populate = [
+ "insert into %sbooze values ('%s')" % (self.table_prefix,s)
+ for s in self.samples
+ ]
+ return populate
+
+ def test_fetchmany(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+
+ # cursor.fetchmany should raise an Error if called without
+ #issuing a query
+ self.assertRaises(self.driver.Error,cur.fetchmany,4)
+
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchmany()
+ self.assertEqual(len(r),1,
+ 'cursor.fetchmany retrieved incorrect number of rows, '
+ 'default of arraysize is one.'
+ )
+ cur.arraysize=10
+ r = cur.fetchmany(3) # Should get 3 rows
+ self.assertEqual(len(r),3,
+ 'cursor.fetchmany retrieved incorrect number of rows'
+ )
+ r = cur.fetchmany(4) # Should get 2 more
+ self.assertEqual(len(r),2,
+ 'cursor.fetchmany retrieved incorrect number of rows'
+ )
+ r = cur.fetchmany(4) # Should be an empty sequence
+ self.assertEqual(len(r),0,
+ 'cursor.fetchmany should return an empty sequence after '
+ 'results are exhausted'
+ )
+ self.failUnless(cur.rowcount in (-1,6))
+
+ # Same as above, using cursor.arraysize
+ cur.arraysize=4
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchmany() # Should get 4 rows
+ self.assertEqual(len(r),4,
+ 'cursor.arraysize not being honoured by fetchmany'
+ )
+ r = cur.fetchmany() # Should get 2 more
+ self.assertEqual(len(r),2)
+ r = cur.fetchmany() # Should be an empty sequence
+ self.assertEqual(len(r),0)
+ self.failUnless(cur.rowcount in (-1,6))
+
+ cur.arraysize=6
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows = cur.fetchmany() # Should get all rows
+ self.failUnless(cur.rowcount in (-1,6))
+ self.assertEqual(len(rows),6)
+ self.assertEqual(len(rows),6)
+ rows = [r[0] for r in rows]
+ rows.sort()
+
+ # Make sure we get the right data back out
+ for i in range(0,6):
+ self.assertEqual(rows[i],self.samples[i],
+ 'incorrect data retrieved by cursor.fetchmany'
+ )
+
+ rows = cur.fetchmany() # Should return an empty list
+ self.assertEqual(len(rows),0,
+ 'cursor.fetchmany should return an empty sequence if '
+ 'called after the whole result set has been fetched'
+ )
+ self.failUnless(cur.rowcount in (-1,6))
+
+ self.executeDDL2(cur)
+ cur.execute('select name from %sbarflys' % self.table_prefix)
+ r = cur.fetchmany() # Should get empty sequence
+ self.assertEqual(len(r),0,
+ 'cursor.fetchmany should return an empty sequence if '
+ 'query retrieved no rows'
+ )
+ self.failUnless(cur.rowcount in (-1,0))
+
+ finally:
+ con.close()
+
+ def test_fetchall(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ # cursor.fetchall should raise an Error if called
+ # without executing a query that may return rows (such
+ # as a select)
+ self.assertRaises(self.driver.Error, cur.fetchall)
+
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ # cursor.fetchall should raise an Error if called
+ # after executing a a statement that cannot return rows
+ self.assertRaises(self.driver.Error,cur.fetchall)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows = cur.fetchall()
+ self.failUnless(cur.rowcount in (-1,len(self.samples)))
+ self.assertEqual(len(rows),len(self.samples),
+ 'cursor.fetchall did not retrieve all rows'
+ )
+ rows = [r[0] for r in rows]
+ rows.sort()
+ for i in range(0,len(self.samples)):
+ self.assertEqual(rows[i],self.samples[i],
+ 'cursor.fetchall retrieved incorrect rows'
+ )
+ rows = cur.fetchall()
+ self.assertEqual(
+ len(rows),0,
+ 'cursor.fetchall should return an empty list if called '
+ 'after the whole result set has been fetched'
+ )
+ self.failUnless(cur.rowcount in (-1,len(self.samples)))
+
+ self.executeDDL2(cur)
+ cur.execute('select name from %sbarflys' % self.table_prefix)
+ rows = cur.fetchall()
+ self.failUnless(cur.rowcount in (-1,0))
+ self.assertEqual(len(rows),0,
+ 'cursor.fetchall should return an empty list if '
+ 'a select query returns no rows'
+ )
+
+ finally:
+ con.close()
+
+ def test_mixedfetch(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows1 = cur.fetchone()
+ rows23 = cur.fetchmany(2)
+ rows4 = cur.fetchone()
+ rows56 = cur.fetchall()
+ self.failUnless(cur.rowcount in (-1,6))
+ self.assertEqual(len(rows23),2,
+ 'fetchmany returned incorrect number of rows'
+ )
+ self.assertEqual(len(rows56),2,
+ 'fetchall returned incorrect number of rows'
+ )
+
+ rows = [rows1[0]]
+ rows.extend([rows23[0][0],rows23[1][0]])
+ rows.append(rows4[0])
+ rows.extend([rows56[0][0],rows56[1][0]])
+ rows.sort()
+ for i in range(0,len(self.samples)):
+ self.assertEqual(rows[i],self.samples[i],
+ 'incorrect data retrieved or inserted'
+ )
+ finally:
+ con.close()
+
+ def help_nextset_setUp(self,cur):
+ ''' Should create a procedure called deleteme
+ that returns two result sets, first the
+ number of rows in booze then "name from booze"
+ '''
+ raise NotImplementedError,'Helper not implemented'
+ #sql="""
+ # create procedure deleteme as
+ # begin
+ # select count(*) from booze
+ # select name from booze
+ # end
+ #"""
+ #cur.execute(sql)
+
+ def help_nextset_tearDown(self,cur):
+ 'If cleaning up is needed after nextSetTest'
+ raise NotImplementedError,'Helper not implemented'
+ #cur.execute("drop procedure deleteme")
+
+ def test_nextset(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if not hasattr(cur,'nextset'):
+ return
+
+ try:
+ self.executeDDL1(cur)
+ sql=self._populate()
+ for sql in self._populate():
+ cur.execute(sql)
+
+ self.help_nextset_setUp(cur)
+
+ cur.callproc('deleteme')
+ numberofrows=cur.fetchone()
+ assert numberofrows[0]== len(self.samples)
+ assert cur.nextset()
+ names=cur.fetchall()
+ assert len(names) == len(self.samples)
+ s=cur.nextset()
+ assert s == None,'No more return sets, should return None'
+ finally:
+ self.help_nextset_tearDown(cur)
+
+ finally:
+ con.close()
+
+ def test_nextset(self):
+ raise NotImplementedError,'Drivers need to override this test'
+
+ def test_arraysize(self):
+ # Not much here - rest of the tests for this are in test_fetchmany
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.failUnless(hasattr(cur,'arraysize'),
+ 'cursor.arraysize must be defined'
+ )
+ finally:
+ con.close()
+
+ def test_setinputsizes(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ cur.setinputsizes( (25,) )
+ self._paraminsert(cur) # Make sure cursor still works
+ finally:
+ con.close()
+
+ def test_setoutputsize_basic(self):
+ # Basic test is to make sure setoutputsize doesn't blow up
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ cur.setoutputsize(1000)
+ cur.setoutputsize(2000,0)
+ self._paraminsert(cur) # Make sure the cursor still works
+ finally:
+ con.close()
+
+ def test_setoutputsize(self):
+ # Real test for setoutputsize is driver dependant
+ raise NotImplementedError,'Driver need to override this test'
+
+ def test_None(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ cur.execute('insert into %sbooze values (NULL)' % self.table_prefix)
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchall()
+ self.assertEqual(len(r),1)
+ self.assertEqual(len(r[0]),1)
+ self.assertEqual(r[0][0],None,'NULL value not returned as None')
+ finally:
+ con.close()
+
+ def test_Date(self):
+ d1 = self.driver.Date(2002,12,25)
+ d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(d1),str(d2))
+
+ def test_Time(self):
+ t1 = self.driver.Time(13,45,30)
+ t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(t1),str(t2))
+
+ def test_Timestamp(self):
+ t1 = self.driver.Timestamp(2002,12,25,13,45,30)
+ t2 = self.driver.TimestampFromTicks(
+ time.mktime((2002,12,25,13,45,30,0,0,0))
+ )
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(t1),str(t2))
+
+ def test_Binary(self):
+ b = self.driver.Binary('Something')
+ b = self.driver.Binary('')
+
+ def test_STRING(self):
+ self.failUnless(hasattr(self.driver,'STRING'),
+ 'module.STRING must be defined'
+ )
+
+ def test_BINARY(self):
+ self.failUnless(hasattr(self.driver,'BINARY'),
+ 'module.BINARY must be defined.'
+ )
+
+ def test_NUMBER(self):
+ self.failUnless(hasattr(self.driver,'NUMBER'),
+ 'module.NUMBER must be defined.'
+ )
+
+ def test_DATETIME(self):
+ self.failUnless(hasattr(self.driver,'DATETIME'),
+ 'module.DATETIME must be defined.'
+ )
+
+ def test_ROWID(self):
+ self.failUnless(hasattr(self.driver,'ROWID'),
+ 'module.ROWID must be defined.'
+ )
+
Property changes on: drivers/python/trunk/tests/dbapi20.py
___________________________________________________________________
Name: svn:keywords
+ Date Author Id Rev Url
Added: drivers/python/trunk/tests/test_ingresdbi_dbapi20.py
===================================================================
--- drivers/python/trunk/tests/test_ingresdbi_dbapi20.py (rev 0)
+++ drivers/python/trunk/tests/test_ingresdbi_dbapi20.py 2008-06-05 17:33:37 UTC (rev 49)
@@ -0,0 +1,428 @@
+#!/usr/bin/env python
+"""
+ Test script for the Ingres Python DBI based on test_psycopg_dbapi20.py.
+ Uses dbapi20.py version 1.10 obtained from
+ http://stuartbishop.net/Software/DBAPI20TestSuite/
+
+ History:
+ 02-May-2008 (grant.croker at ingres.com)
+ Created
+ 06-May-2008
+ Added the detection/creation of the procedure 'lower' to setUp
+"""
+import dbapi20
+import unittest
+import ingresdbi
+import popen2
+import re
+import os
+
+"""
+
+ By default the test expects a locally accessible database called
+ dbapi20_test. This, along with other settings, can be overridden
+ using the following OS environment variables:
+
+ test_dsn - ODBC data source name
+ test_username - user to connect as
+ test_password - password for the above user
+ test_vnode - Ingres/NET virtual node alias (use netutil to
+ define)
+ test_database - database name, defaults to dbapi20_test if not
+ specified
+ test_pooled - *Windows only*, use ODBC pooling, Boolean
+ (true, on, false, off)
+ test_trace - Level of trace output 0-7, default 0
+ test_trace_file - where to put trace output, if not set output
+ goes to STDERR
+
+ Windows users use:
+ set VARNAME=value
+ for example
+ set test_database=pytestdb
+ Linux/UNIX/Mac OS X use:
+ VARNAME=value; export VARNAME # sh,ksh,bash Bourne shell variants
+ setenv VARNAME=value # csh, tcsh C-shell variants
+
+ for example
+ test_database=pytestdb; export test_database # sh,ksh,bash
+ setenv test_database pytestdb # csh, tcsh
+
+"""
+dsn=os.getenv('test_dsn')
+username=os.getenv('test_username')
+password=os.getenv('test_password')
+vnode=os.getenv('test_vnode')
+database=os.getenv('test_database')
+pooled=os.getenv('test_pooled') # Windows only
+traceLevel_str=os.getenv('test_trace')
+if traceLevel_str != None:
+ traceLevel = int(traceLevel_str)
+else:
+ traceLevel = 0
+traceFile=os.getenv('test_trace_file')
+
+class test_Ingresdbi(dbapi20.DatabaseAPI20Test):
+ driver = ingresdbi
+ connect_args = ()
+ connect_kw_args = {}
+
+ if dsn != None:
+ connect_kw_args.setdefault('dsn',dsn)
+ if username != None:
+ connect_kw_args.setdefault('uid',username)
+ if password != None:
+ connect_kw_args.setdefault('pwd',password)
+ if vnode != None:
+ connect_kw_args.setdefault('vnode',vnode)
+ if database != None:
+ connect_kw_args.setdefault('database',database)
+ else:
+ connect_kw_args.setdefault('database', 'dbapi20_test')
+ if pooled != None:
+ connect_kw_args.setdefault('pooled', pooled)
+ trace=(traceLevel, traceFile)
+ else:
+ trace=(0, None)
+ connect_kw_args.setdefault('trace',trace)
+ table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
+
+ lower_func = 'lower' # For stored procedure test
+
+ def setUp(self):
+ # Call superclass setUp In case this does something in the
+ # future
+ dbapi20.DatabaseAPI20Test.setUp(self)
+
+ try:
+ con = self._connect()
+ try:
+ # Do we have a procedure called lower?
+ cur=con.cursor()
+ cur.execute("select procedure_name from iiprocedures where procedure_name = 'lower'")
+ if len(cur.fetchall()) == 0:
+ cur.execute("create procedure lower( a varchar(20) not null) result row (varchar(20)) as declare x=varchar(20) not null; begin select lower(a) into x; return row(x); end")
+ con.commit()
+ con.close()
+ except:
+ pass
+ except:
+ cmd = "createdb -fnofeclients dbapi20_test"
+ cout,cin = popen2.popen2(cmd)
+ cin.close()
+ cout.read()
+
+ def tearDown(self):
+ dbapi20.DatabaseAPI20Test.tearDown(self)
+
+ def test_nextset(self): pass
+ def test_setoutputsize(self): pass
+
+ """
+ Additional Ingres DBI tests
+
+ For the time being these are kept out of dbapi20.py so as not to pollute
+ the test suite.
+ """
+
+ ddl3 = 'create table %sblob1 (name long varchar)' % table_prefix
+ ddl4 = 'create table %sblob2 (name long byte)' % table_prefix
+ ddl5 = 'create table %stdata (colint1 integer, colbigint2 bigint, colfloat3 float, colchar4 char (20), colvarchar5 varchar(20))' % table_prefix
+
+ xddl3 = 'drop table %sblob1' % table_prefix
+ xddl4 = 'drop table %sblob2' % table_prefix
+ xddl5 = 'drop table %stdata' % table_prefix
+
+ def executeDDL3(self,cursor):
+ cursor.execute(self.ddl3)
+
+ def executeDDL4(self,cursor):
+ cursor.execute(self.ddl4)
+
+ def executeDDL5(self,cursor):
+ cursor.execute(self.ddl5)
+
+ def test_smallblob(self):
+ blob = "123456"
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL3(cur)
+ cur.execute("insert into %sblob1 values (?)" %
+ self.table_prefix, (blob,) )
+ cur.execute("select * from %sblob1 " %
+ self.table_prefix, (blob,) )
+ rs = cur.fetchall()
+ for x in range (0, len(blob)):
+ self.assertEqual(blob[x],rs[0][0][x])
+
+ finally:
+ cur.close()
+ con.close()
+
+ def test_4Kblob(self):
+ blob = "1234567890" * 400
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL3(cur)
+ cur.execute("insert into %sblob1 values (?)" %
+ self.table_prefix, (blob,) )
+ cur.execute("select * from %sblob1 " %
+ self.table_prefix, (blob,) )
+ rs = cur.fetchall()
+ for x in range (0, len(blob)):
+ self.assertEqual(blob[x],rs[0][0][x])
+
+ finally:
+ cur.close()
+ con.close()
+ del(blob)
+
+
+ def test_40Kblob(self):
+ blob = self.driver.Binary("1234567890" * 4000)
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL4(cur)
+ cur.execute("insert into %sblob2 values (?)" %
+ self.table_prefix, (blob,) )
+ cur.execute("select * from %sblob2 " %
+ self.table_prefix, (blob,) )
+ rs = cur.fetchall()
+ for x in range (0, len(blob)):
+ self.assertEqual(blob[x],rs[0][0][x])
+
+ finally:
+ cur.close()
+ con.close()
+ del(blob)
+
+
+ def test_40K2R(self):
+ blob = self.driver.Binary("1234567890" * 4000)
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL4(cur)
+ cur.execute("insert into %sblob2 values (?)" %
+ self.table_prefix, (blob,) )
+ cur.execute("insert into %sblob2 values (?)" %
+ self.table_prefix, (blob,) )
+ cur.execute("select * from %sblob2 " %
+ self.table_prefix, (blob,) )
+ rs = cur.fetchall()
+ for x in range (0, len(blob)):
+ self.assertEqual(blob[x],rs[0][0][x])
+ for x in range (0, len(blob)):
+ self.assertEqual(blob[x],rs[1][0][x])
+
+ finally:
+ cur.close()
+ con.close()
+ del(blob)
+
+ def test_data(self):
+ fobj = file("data.txt","r")
+ list = fobj.readlines()
+ fobj.close()
+
+ eog = False
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL5(cur)
+ for x in range (0, len(list)):
+ tdata = list[x].rstrip('\n')
+ if (tdata.startswith("INT ")):
+ intdata = int(tdata.lstrip("INT "))
+ elif (tdata.startswith("LONG ")):
+ longdata = long(tdata.lstrip("LONG "))
+ elif (tdata.startswith("FLOAT ")):
+ floatdata = float(tdata.lstrip("FLOAT "))
+ elif (tdata.startswith("CHAR ")):
+ chardata = (tdata.lstrip("CHAR "))
+ elif (tdata.startswith("VARCHAR ")):
+ varchardata = (tdata.lstrip("VARCHAR "))
+ eog = True
+ if (eog == True):
+ eog = False
+ cur.execute("insert into %stdata values (?, ?, ?, ?, ?)" %
+ self.table_prefix, (intdata, longdata, floatdata,
+ chardata, varchardata))
+ cur.execute("select * from %stdata" % self.table_prefix)
+ rs = cur.fetchall()
+ for y in range(0, len(rs)):
+ for x in range (0, 5):
+ tdata = list[x+(5*y)].rstrip('\n')
+ if (tdata.startswith("INT ")):
+ intdata = int(tdata.lstrip("INT "))
+ self.assertEqual(intdata, rs[y][0])
+ elif (tdata.startswith("LONG ")):
+ longdata = long(tdata.lstrip("LONG "))
+ self.assertEqual(longdata, rs[y][1])
+ elif (tdata.startswith("FLOAT ")):
+ floatdata = float(tdata.lstrip("FLOAT "))
+ self.assertEqual(floatdata, rs[y][2])
+ elif (tdata.startswith("CHAR ")):
+ chardata = (tdata.lstrip("CHAR "))
+ m = re.search(chardata, rs[y][3])
+ substr = m.group()
+ self.assertEqual(chardata, substr)
+ elif (tdata.startswith("VARCHAR ")):
+ varchardata = (tdata.lstrip("VARCHAR "))
+ self.assertEqual(varchardata, rs[y][4])
+ finally:
+ cur.close()
+ con.close()
+
+ def test_connectionAttributes(self):
+ kw_args = self.connect_kw_args.copy()
+
+ kw_args.setdefault('vnode', '(LOCAL)')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+ kw_args.setdefault('servertype', 'Ingres')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+ kw_args.setdefault('driver', 'Ingres')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+ kw_args.setdefault('selectloops', 'on')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+ kw_args.setdefault('rolename', 'nullrole')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+ kw_args.setdefault('rolepwd', 'nullpwd')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+ kw_args.setdefault('group', 'nullgroup')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+ kw_args.setdefault('numeric_overflow', 'N')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+ kw_args.setdefault('dbms_pwd', 'nulldbmspwd')
+ self.con = self.driver.connect( *self.connect_args, **kw_args)
+ self.con.close()
+
+
+ def test_cursorPreparedAttribute(self):
+ self.con = self._connect()
+ try:
+ self.curs = self.con.cursor()
+ self.curs.prepared = "Yes"
+ self.curs.execute("select * from iitables")
+ l = self.curs.fetchall()
+ self.failIfEqual(len(l), 0)
+ finally:
+ self.curs.close()
+ self.con.close()
+
+ def test_cursorMessages(self):
+ self.con = self._connect()
+ try:
+ self.curs = self.con.cursor()
+ self.curs.prepared = "Yes"
+ self.curs.execute("select * from iitables")
+ self.curs.execute("select * from iidbcapabilities")
+ self.failIfEqual(len(self.curs.messages), 0)
+ finally:
+ self.curs.close()
+ self.con.close()
+
+ def test_connectionMessages(self):
+ self.con = self._connect()
+ self.con.close()
+ try:
+ self.con.close()
+ except self.driver.Error:
+ pass
+ self.failIfEqual(len(self.con.messages), 0)
+
+ def test_connectionErrorHandler(self):
+ def connHandler(connection, cursor, exception, msg):
+ self.failUnless(connection is self.con)
+ self.failUnless(cursor == None)
+ self.failIf(exception == None)
+ self.failIf(len(msg) == 0)
+ self.con = self._connect()
+ self.con.errorhandler = connHandler
+ self.con.close()
+ self.con.close()
+
+ def test_cursorConnectionAttribute(self):
+ self.con = self._connect()
+ try:
+ self.curs = self.con.cursor()
+ self.failUnless(hasattr(self.curs,'connection'))
+ self.failUnlessEqual(self.curs.connection, self.con)
+ finally:
+ self.curs.close()
+ self.con.close()
+
+ def test_cursorErrorHandler(self):
+ def handler(connection, cursor, exception, msg):
+ self.failUnless(connection is self.con)
+ self.failUnless(cursor == self.curs)
+ self.failIf(exception == None)
+ self.failIf(len(msg) == 0)
+ self.con = self._connect()
+ try:
+ self.curs = self.con.cursor()
+ self.curs.errorhandler = handler
+ self.curs.execute("select * from xiitables")
+ finally:
+ self.curs.close()
+ self.con.close()
+
+ def test_cursorIter(self):
+ self.con = self._connect()
+ try:
+ self.curs = self.con.cursor()
+ self.curs.execute("select * from iitables")
+ for i in self.curs:
+ self.failIfEqual(i, None)
+ self.failIfEqual(len(i[0]), 0)
+ finally:
+ self.curs.close()
+ self.con.close()
+
+ def test_cursorScroll(self):
+ # Not supported until scrollable cursors are implemented
+ self.con = self._connect()
+ self.curs = self.con.cursor()
+ self.curs.execute("select * from iitables")
+ try:
+ self.curs.scroll(1)
+ except self.driver.NotSupportedError:
+ pass
+ self.curs.close()
+ self.con.close()
+
+ def test_cursorLastRowId(self):
+ # Not supported if or until row ID's are implemented
+ self.con = self._connect()
+ self.curs = self.con.cursor()
+ self.curs.execute("select * from iitables")
+ try:
+ rowid = self.curs.lastrowid
+ except self.driver.NotSupportedError:
+ pass
+ self.curs.close()
+ self.con.close()
+
+ """ End of Ingres custom tests """
+
+if __name__ == '__main__':
+ unittest.main()
Property changes on: drivers/python/trunk/tests/test_ingresdbi_dbapi20.py
___________________________________________________________________
Name: svn:keywords
+ Date Author Id Rev Url
More information about the svn-commits
mailing list