Skip to main content
 首页 » 编程设计

python操作oracle完整教程

2022年07月19日155sharpest

python操作oracle完整教程

1.    连接对象

操作数据库之前,首先要建立数据库连接。有下面几个方法进行连接。

>>>import cx_Oracle
>>>db = cx_Oracle.connect('hr', 'hrpwd', 'localhost:1521/XE')

>>>db1 = cx_Oracle.connect('hr/hrpwd@localhost:1521/XE')
>>>dsn_tns = cx_Oracle.makedsn('localhost', 1521, 'XE')
>>>print dsn_tns

>>>print db.version
10.2.0.1.0
>>> versioning = db.version.split('.')
>>> print versioning
['10', '2', '0', '1', '0']
>>> if versioning[0]=='10':
... print "Running 10g"
... elif versioning[0]=='9':
... print "Running 9i"
...
Running 10g
>>> print db.dsn

localhost:1521/XE

2.    cursor对象

      使用数据库连接对象的cursor()方法,你可以定义任意数量的cursor对象,简单的程序可能使用一个cursor,并重复使用了,但大型项目会使用多个不同的cursor。

>>>cursor= db.cursor()

应用程序逻辑通常需要清楚的区分处理数据操作的每个阶段。这将帮助更好的理解性能瓶颈和代码优化。这些步骤有:

parse(optional)

无需调用该方法,因为执行阶段会自动先执行,用于检查sql语句是否正确,当有错误时,抛出DatabaseError异常及相应的错误信息。如:‘’ORA-00900:invalid SQL statement.“。

Execute

cx_Oracle.Cursor.execute(

statement,[parameters], **keyword_parameters)

该方法能接收单个参数SQL,直接操作数据库,也可以通过绑定变量执行动态SQL,parames或keyworparameters可以是字典、序列或一组关键字参数。

cx_Oracle.Cursor.executemany(statement,parameters)

特别有用的批量插入,避免一次只能插入一条;

Fetch(optional)

仅用于查询,因为DDL和DCL语句没有返回结果。如果cursor没有执行查询,会抛出InterfaceError异常。

cx_Oracle.Cursor.fetchall() 

获取所有结果集,返回元祖列表,如果没有有效行,返回空列表。

cx_Oracle.Cursor.fetchmany([rows_no]) 

从数据库中取下一个rows_no数据

cx_Oracle.Cursor.fetchone() 

从数据库中取单个元祖,如果没有有效数据返回none。

3.    绑定变量

绑定变量查询可以提高效率,避免不必要的编译;参数可以是名称参数或位置参数,尽量使用名称绑定。

>>>named_params = {'dept_id':50, 'sal':1000}

>>>query1 = cursor.execute('SELECT * FROM employees WHERE department_id=:dept_idAND salary>:sal', named_params)
>>> query2 = cursor.execute('SELECT * FROM employees WHERE department_id=:dept_idAND salary>:sal', dept_id=50, sal=1000)

Whenusing named bind variables you can check the currently assigned ones using thebindnames() method of the cursor:

 

>>> printcursor.bindnames() 
['DEPT_ID', 'SAL']

4.    批量插入

大量插入插入操作,可以使用python的批量插入功能,无需多次单独调用insert,这样可以提升性能。参考后面示例代码。


5.    示例代码

''' 
Created on 2016年7月7日 
@author: Tommy 
''' 
import cx_Oracle 
 
class Oracle(object): 
    """  oracle db operator  """ 
    def __init__(self,userName,password,host,instance): 
        self._conn = cx_Oracle.connect("%s/%s@%s/%s" % (userName,password,host,instance)) 
        self.cursor = self._conn.cursor() 
         
    def queryTitle(self,sql,nameParams={}): 
        if len(nameParams) > 0 : 
            self.cursor.execute(sql,nameParams) 
        else: 
            self.cursor.execute(sql) 
 
        colNames = [] 
        for i in range(0,len(self.cursor.description)): 
            colNames.append(self.cursor.description[i][0]) 
         
        return colNames 
     
    # query methods 
    def queryAll(self,sql): 
        self.cursor.execute(sql) 
        return self.cursor.fetchall() 
     
    def queryOne(self,sql): 
        self.cursor.execute(sql) 
        return self.cursor.fetchone() 
     
    def queryBy(self,sql,nameParams={}): 
        if len(nameParams) > 0 : 
            self.cursor.execute(sql,nameParams) 
        else: 
            self.cursor.execute(sql) 
             
        return self.cursor.fetchall() 
     
    def insertBatch(self,sql,nameParams=[]): 
        """batch insert much rows one time,use location parameter""" 
        self.cursor.prepare(sql) 
        self.cursor.executemany(None, nameParams) 
        self.commit() 
     
    def commit(self): 
        self._conn.commit() 
     
    def __del__(self): 
        if hasattr(self,'cursor'):  
            self.cursor.close() 
             
        if hasattr(self,'_conn'):  
            self._conn.close() 
 
 
 
 
 
def test1(): 
    # sql = """select user_name,user_real_name,to_char(create_date,'yyyy-mm-dd') create_date from sys_user where id = '10000' """ 
    sql = """select user_name,user_real_name,to_char(create_date,'yyyy-mm-dd') create_date from sys_user where id =: id """ 
    oraDb = Oracle('test','java','192.168.0.192','orcl') 
     
    fields = oraDb.queryTitle(sql, {'id':'10000'}) 
    print(fields) 
     
    print(oraDb.queryBy(sql, {'id':'10000'})) 
 
def test2(): 
    oraDb = Oracle('test','java','192.168.0.192','orcl') 
    cursor = oraDb.cursor 
     
    create_table = """ 
    CREATE TABLE python_modules ( 
    module_name VARCHAR2(50) NOT NULL, 
    file_path VARCHAR2(300) NOT NULL 
    ) 
    """ 
    from sys import modules 
       
    cursor.execute(create_table) 
    M = [] 
    for m_name, m_info in modules.items(): 
        try: 
            M.append((m_name, m_info.__file__)) 
        except AttributeError: 
            pass 
       
    sql = "INSERT INTO python_modules(module_name, file_path) VALUES (:1, :2)" 
    oraDb.insertBatch(sql,M) 
       
    cursor.execute("SELECT COUNT(*) FROM python_modules") 
    print(cursor.fetchone()) 
    print('insert batch ok.') 
     
    cursor.execute("DROP TABLE python_modules PURGE") 
 
test2() 




本文参考链接:https://blog.csdn.net/neweastsun/article/details/51852304