::: IT인터넷 :::

psycopg2로 PostgreSQL CRUD 클래스 구현하기

곰탱이푸우 2022. 6. 2. 08:20
psycopg2의 개요와 자주 사용하는 함수들에 대해 알아보았다.
해당 내용은 아래 포스팅을 참고한다.
해당 함수들을 그대로 사용해도 되지만, 두 개의 함수를 연속적으로 호출해야 하므로 번거롭다.
  • connect와 cursor
  • execute와 commit
  • execute와 fetchone / fetchmany / fetchall
  • conn.close와 cur.close
 
그리고 외부 라이브러리를 래핑(Wraping)해서 사용하는 것의 장점이 많다.
해당 내용은 아래 포스팅에서 다룬 psycopg2-binary 소개 부분을 참고한다.
아래 특정 기능 중심으로 정리하고 향후 필요한 기능은 분리하여 추가한다.
예제 코드를 활용하여 정리한 것으로 일부 미흡한 부분이 있음을 감안한다.
 
 

연결

psycopg2의 connect와 cursor 함수를 사용한다.
클래스의 인스턴스가 생성될때 연결되고, 인스턴스가 제거 될때 연결이 해제되면 좋을 것이다.
 
따라서 클래스의 생성자 부분에서 connect와 cursor 함수를 호출한다.
connect 함수는 PostgreSQL의 주소, 포트, 계정, 비밀번호, Database 이름을 전달 받는다.
 
따라서 연결에 필요한 정보들을 생성자 함수의 인자로 전달받고, 유효성을 체크하는 부분을 추가하면 더 좋다. 
imort psycopg2

class PostgresDB():
    def __init__(self, host, port, dbname, user, password):
        self.host=host
        self.port=port
        self.dbname=dbname
        self.user=user
        self.password=password

        try:
            self.conn = psycopg2.connect(host=self.host, port=self.post, dbname=self.dbname, 
                                         user=self.user, password=user.password)
        except Exception as e:
            print(e)
        else:
            # 예외가 발생하지 않은 경우에만 cursor 함수 호출
            self.cur = self.conn.cursor()

 

계정명과 password가 노출되는 것이 꺼려질 경우, 적절한 다른 방법을 사용한다.
 

해제

psycopg2의 connect와 cursor 인스턴스의 close 함수를 사용한다.
인스턴스가 제거될때 연결이 해제되어야 한다.
 
따라서 소멸자에서 connect와 cursor의 close 함수를 호출한다.
class PostgresDB():
    def __del__(self):
        self.cur.close()
        self.conn.close()
 
 

실행

SQL 명령을 처리하기 위한 함수를 구현한다. cursor 객체의 execute 함수를 사용한다.
해당 부분은 아래에서 정의할 CRUD 함수들이 내부에서 호출한다.
 
데이터를 읽는 SELECT 구문은 fetchone, fetchmany, fetchall 함수 등을 사용해야 한다.
그 외의 구문들은 Database의 상태를 변경하기 때문에 commit 함수를 사용해야 한다.
 
따라서 구현하는 함수에서는 두 경우를 나눠서 구현해야 한다.
class PostgresDB(): 
    def execute(self, query, msg=""):
        assert query is not None, "query is not allowed None"
        assert msg is not None, "msg is not allowed None"

        try:
            self.cur.execute(query)
    
            is_select = query.upper().startswith('SELECT ')
            result = None
            
            if is_select:
                result = self.cur.fetchall()
            else:
                self.conn.commit()
            
            return result
        except Exception as e:   # 쿼리 실행에 오류가 발생한 경우  
            print("Error Occured in {msg} Data!".format(msg=msg), e)
 

스키마와 테이블명 생성

쿼리할 때 Schema가 지정 되지 않고 테이블 이름만 사용할 경우, Schema의 SEARCH_PATH에 따라 테이블을 탐색한다.
따라서 CRUD 함수를 호출할 때 Schema 이름의 전달 여부에 따라 테이블명 지정을 다르게 해야 한다.
 
다음과 같이 구현한다.
class PostgresDB():
    def make_table_name(self, schema_name, table_name):
        assert schema_name is not None, "schema_name is not allowed None"
        assert table_name is not None, "table_name is not allowed None"

        # schema가 지정되지 않으면 table_name만 사용
        # schema의 SEARCH_PATH에 따라 테이블 탐색
        if schema_name == "":
            schema_table = table_name
        # schema가 지정되어 있으면 schema_name.table_name 형태로 생성
        else:
            schema_table = ".".join([schema_name, table_name])

        return schema_table

 

 

입력

레코드를 데이터베이스의 특정 테이블에 입력하는 함수를 구현한다.
앞에서 cursor 객체의 execute 함수를 래핑(Wraping)한 execute 함수를 사용한다.
 
데이터를 입력하는 INSERT INFO 구문은  Database의 상태를 변경한다.
따라서 execute 함수 내부의 commit 함수가 호출된다.
 
필요한 인자는 다음과 같다.
인자명
내용
기본값
table_name
테이블 이름
 
column_name
컬럼 이름
 
data
입력할 데이터
 
schema_name
Schema 이름
""
 
다음과 같이 구현한다.
class PostgresDB(): 
    def insert(self, table_name, column_name, data, schema_name=""):
        assert table_name is not None, "table_name is not allowed None value!"
        assert column_name is not None, "column_name is not allowed None"
        assert data is not None, "data is not allowed None"
        assert schema_name is not None, "schema_name is not allowed None"

        schema_table = self.make_table_name(schema_name, table_name)

        insert_sql = "INSERT INTO {schema_table} ({column}) VALUES ('{data}');"
            .format(schema_table=schema_table, column=column_name, data=data)

        result = self.execute(insert_sql, "Insert")
       
        return result

 

조회

데이터베이스의 특정 테이블에서 레코드를 입력하는 함수를 구현한다.
앞에서 cursor 객체의 execute 함수를 래핑(Wraping)한 execute 함수를 사용한다.
 
데이터를 조회하는 SELECT 구문은  Database의 상태 변경이 없고, 조회 결과를 메모리에 가져와야 한다.
따라서 execute 함수 내부의 fetch_all 함수가 호출된다.
 
필요한 인자는 다음과 같다.
인자명
내용
기본값
table_name
테이블 이름
 
columns
컬럼 이름
 
conditions
검색 조건
""
schema_name
Schema 이름
""
 
다음과 같이 구현한다.
class PostgresDB(): 
    def select(self, table_name, columns, conditions="", schema_name=""):
        assert table_name is not None, "table_name is not allowed None value!"
        assert columns is not None, "columns is not allowed None"
        assert conditions is not None, "conditions is not allowed None"
        assert schema_name is not None, "schema_name is not allowed None"

        schema_table = self.make_table_name(schema_name, table_name)

        select_sql = "SELECT {columns} FROM {schema_table};"
            .format(columns=columns, schema_table=schema_table)

        # 검색 조건이 있으면 쿼리문 뒷 부분에 추가
        if conditions != "":
            select_sql.replace(";", " WHERE {conditions};".format(conditions = conditions))

        result = self.execute(select_sql, "Select")

        return result

 

 

수정

데이터베이스의 특정 테이블에서 레코드를 수정하는 함수를 구현한다.
앞에서 cursor 객체의 execute 함수를 래핑(Wraping)한 execute 함수를 사용한다.
 
데이터를 입력하는 INSERT INFO 구문은  Database의 상태를 변경한다.
따라서 execute 함수 내부의 commit 함수가 호출된다.
 
필요한 인자는 다음과 같다.
인자명
내용
기본값
table_name
테이블 이름
 
column_name
컬럼 이름
 
value
수정할 값
 
conditions
검색 조건
""
schema_name
Schema 이름
""
 
다음과 같이 구현한다.
class PostgresDB(): 
    def update(self, table_name, column_name, value, conditions="", schema_name=""):
        assert table_name is not None, "table_name is not allowed None value!"
        assert column_name is not None, "column_name is not allowed None"
        assert value is not None, "value is not allowed None"
        assert conditions is not None, "conditions is not allowed None"
        assert schema_name is not None, "schema_name is not allowed None"

        schema_table = self.make_table_name(schema_name, table_name)

        update_sql = "UPDATE {schema_table} SET {column}='{value}';"
              .format(schema_table=schema_table, column=column_name, value=value)

        # 검색 조건이 있으면 쿼리문 뒷 부분에 추가
        if conditions != "":
            update_sql.replace(";", " WHERE {conditions};".format(conditions = conditions))

        result = self.execute(update_sql, "Update")
       
        return result

 

삭제

데이터베이스의 특정 테이블에서 레코드를 삭제하는 함수를 구현한다.
앞에서 cursor 객체의 execute 함수를 래핑(Wraping)한 execute 함수를 사용한다.
 
데이터를 삭제하는 DELETE 구문은  Database의 상태를 변경한다.
따라서 execute 함수 내부의 commit 함수가 호출된다.
 
필요한 인자는 다음과 같다.
인자명
내용
기본값
table_name
테이블 이름
 
conditions
검색 조건
""
schema_name
Schema 이름
""
 
다음과 같이 구현한다.
class PostgresDB():
    def delete(self, table_name, conditions="", schema_name=""):
        assert table_name is not None, "table_name is not allowed None value!"
        assert conditions is not None, "conditions is not allowed None"
        assert schema_name is not None, "schema_name is not allowed None"

        schema_table = self.make_table_name(schema_name, table_name)

        delete_sql = "DELETE FROM {schema_table};".format(schema_table=schema_table)
        # 검색 조건이 있으면 쿼리문 뒷 부분에 추가
        if conditions != "":
            delete_sql.replace(";", " WHERE {conditions};".format(conditions = conditions))

        result = self.execute(delete_sql, "Delete") 
       
        return result

 

 

테이블 생성

기본적인 CRUD 구현은 위의 Insert, Update, Select, Delete면 된다.
 
그러나 SQL 구문은 아주 많고, 다양한 SQL을 실행하기 위해 execute 함수를 구현했다.
해당 execute 함수는 Insert, Update, Select, Delete 기능 구현에서도 호출하여 사용했다.
 
CRUD 함수 외에 사용이 빈번한 쿼리는 래핑 클래스의 함수로 구현하고 사용하면 편리하다.
자주 사용하는 쿼리를 별도 함수로 추가해야 하는 경우 참고하여 진행한다.
 
아래는 예제 함수로 테이블을 생성하는 기능을 정의한다.
사용하려는 Schema와 테이블이 PostgreSQL에 존재하는지 확인한다.
이미 존재하면 메시지만 출력하고, 없으면 생성한다.
 
스키마가 있는지 확인하고, 없으면 오류 메시지를 출력한다. 있으면 테이블이 있는지 확인한다.
테이블이 있으면 오류 메시지를 출력한다. 없으면 테이블을 생성한다.
class PostgresDB(): 
    def create_table(self, schema_name, table_name, columns):
        # columns는 [컬럼명 데이터타입 제약조건] 등으로 정의 된 문자열 형태
        # CREATE TABLE의 예제를 참고한다.

        # 전달 받은 인자가 None 값인지 확인       
        assert schema_name is not None, "schema_name is not allowed None value!"
        assert table_name is not None, "table_name is not allowed None value!"
        assert columns is not None, "columns is not allowed None"
               
        # PostgreSQL에 사용할 테이블이 존재하는지 확인
        search_sql = "SELECT pg_tables.schemaname, pg_tables.tablename FROM pg_catalog.pg_tables "
        search_sql += f"WHERE tablename='{table_name}' AND schemaname='{schema_name}';"
       
        result = None
        try:
            result = self.execute(search_sql, "Check Table")
        except Exception as e:  # 쿼리 실행에 오류가 발생한 경우
            result = ("Error Occured in Search Schema!", e)
            return result

        # 이미 테이블이 존재하는 경우 생성하지 않음
        if result:
            print("Table is already exist!")
            return result

        schema_table = self.make_table_name(schema_name, table_name)

        # 테이블 생성 진행
        create_sql = "CREATE TABLE {schema_table} ({columns});"
            .format(schema_table=schema_table, columns=columns)       
        result = self.execute(create_sql, "Create Table")
       
        return result