Study/django

DB를 먼저 설계하고 django와 연동하기 (6) - 함수와 lock 사용하기

bluebamus 2023. 12. 14. 01:08

github 저장소 : https://github.com/bluebamus/synchronization-test-between-django-and-db

 

1. DB를 먼저 설계하고 django와 연동하기 (1) - 작업 정의

2. DB를 먼저 설계하고 django와 연동하기 (2) - migration 후 db 추가 설계 그리고 inspectdb

3. DB를 먼저 설계하고 django와 연동하기 (3) - inspectdb로 생성된 models.py로 migrations 후 비교

4. DB를 먼저 설계하고 django와 연동하기 (4) - fake를 사용해 migrate 후 models.py 수정 반영하기

5. DB를 먼저 설계하고 django와 연동하기 (5) - view table 테스트

6. DB를 먼저 설계하고 django와 연동하기 (6) - 함수와 lock 사용하기

 1. CURD 중 update에 대한 함수를 만들고 코드로 사용하기

   - 함수를 만든다.

      - stock_order 테이블의 item_id를 기준으로 조건에 맞는 행들을 가져와 order_quantity의 값을 전달한 값만큼 더한다.

      - 반환 값은 item_id로 찾은 행들의 수가 된다.

   - 코드

      - RETURNING INTO  : 선택한 데이터를 변수에 담는 “SELECT ~ INTO”처럼, RETURNING INTO 절은 INSERT, UPDATE, DELETE문이 실행되면서 삽입, 수정 혹은 삭제된 데이터를 변수에 담는 역할을 수행한다.

CREATE OR REPLACE FUNCTION update_quantity_i(i_id INT, number INT) RETURNS INT AS $$
DECLARE
    updated_rows INT := 0;
BEGIN
    WITH updated_rows_cte AS (
        UPDATE stock_order 
        SET order_quantity = order_quantity + number 
        WHERE item_id = i_id
        RETURNING item_id
    )
    SELECT COUNT(*) INTO updated_rows FROM updated_rows_cte;
    
    RETURN updated_rows;
END;
$$ LANGUAGE plpgsql;

# item_id = 10, 1만큼 더한다.
SELECT update_quantity_i(10,1);

# item_id 10에 대한 결과 값 확인
select * from stock_order so where item_id=10;

 

   - 결과

 

 2. 프로시저 lock  사용해보기

   1) lock 정리

      - 아래 샘플 코드들은 시나리오 혹은 기존 테이블과 상관 없이 생성된 코드 입니다.

      - PostgreSQL에서 Stored Procedure에서 사용 가능한 주요 잠금 방법은 두 가지입니다: Row-Level Lock 및 Table-Level Lock입니다.

         1] Row-Level Lock: 이 방법은 특정 행에만 잠금을 적용하는 방법입니다. PostgreSQL에서는 SELECT ... FOR UPDATE, SELECT ... FOR NO KEY UPDATE, SELECT ... FOR SHARE, SELECT ... FOR KEY SHARE 등의 구문을 사용하여 Row-Level Lock을 적용할 수 있습니다.

CREATE OR REPLACE PROCEDURE update_stock_row_lock()
LANGUAGE plpgsql
AS $$
BEGIN
   FOR rec IN SELECT * FROM item FOR UPDATE
   LOOP
      IF rec.count < 30 THEN
         INSERT INTO stock (fk, order_stock) VALUES (rec.id, 100);
      END IF;
   END LOOP;
END; 
$$;

CALL update_stock_row_lock();

 

         2] Table-Level Lock: 이 방법은 테이블 전체에 잠금을 적용하는 방법입니다. PostgreSQL에서는 LOCK TABLE ... IN ACCESS EXCLUSIVE MODE, LOCK TABLE ... IN ROW EXCLUSIVE MODE 등의 구문을 사용하여 Table-Level Lock을 적용할 수 있습니다.

CREATE OR REPLACE PROCEDURE update_stock_table_lock()
LANGUAGE plpgsql
AS $$
BEGIN
   LOCK TABLE item IN ACCESS EXCLUSIVE MODE;

   FOR rec IN SELECT * FROM item
   LOOP
      IF rec.count < 30 THEN
         INSERT INTO stock (fk, order_stock) VALUES (rec.id, 100);
      END IF;
   END LOOP;
END; 
$$;

CALL update_stock_table_lock();

 

   2) 함수에서 Row-Level Lock 사용하기

      - FOR UPDATE NOWAIT 구문을 사용하여 다른 트랜잭션이 해당 행을 사용 중일 때 즉시 오류를 반환하도록 설정했습니다. 이를 통해 동시에 여러 사용자가 동시에 잔액을 변경하는 상황을 방지할 수 있습니다.

CREATE OR REPLACE FUNCTION update_user_balance(user_id INT, amount NUMERIC) RETURNS VOID AS $$
BEGIN
    -- 해당 사용자의 잔액 행에 잠금 설정
    -- FOR UPDATE 문을 사용하여 특정 행에 대한 잠금을 설정할 수 있습니다.
    -- NOWAIT 옵션을 사용하면 다른 트랜잭션이 해당 행을 사용 중일 때 바로 오류를 반환합니다.
    -- 예를 들어, 사용자의 잔액을 업데이트할 때 다른 트랜잭션이 동시에 잔액을 변경하는 것을 방지할 수 있습니다.
    UPDATE user_balance
    SET balance = balance + amount
    WHERE user_id = user_id
    FOR UPDATE NOWAIT;
    
    -- 필요한 추가 작업 수행
    -- 이 부분에는 사용자 잔액 변경 이후에 수행해야 할 작업을 추가할 수 있습니다.
    
    -- 트랜잭션 커밋
    COMMIT;
END;
$$ LANGUAGE plpgsql;

 

   3) 프로시저 lock 없이 race condition test 실행

      - 테스트에 사용할 postgresql 함수는 더 쉬운 동작을 하는 것으로 새로 생성하였다.

      - sql 코드

CREATE OR REPLACE FUNCTION sync_test.update_quantity(p_id integer, number integer)
 RETURNS integer
 LANGUAGE plpgsql
AS $function$
DECLARE
    updated_rows INT;
BEGIN
    UPDATE stock_order SET order_quantity = order_quantity + number WHERE id = p_id RETURNING id INTO updated_rows;
    RETURN updated_rows;
END;
$function$
;

 

      - django에서 orm 코드를 사용해 직접 여러개의 쿼리를 요청해본다.

      - 코드

def add_stock_order(product_id):
    so = StockOrder.objects.get(id=product_id)
    for _ in range(30):
        if so.order_quantity >= 0:
            so.order_quantity += 1
            so.save()
            #print("th id : ", threading.current_thread().ident, " : ", so.order_quantity,"\n")
thread1 = threading.Thread(target=add_stock_order, args=(result.id,))
thread2 = threading.Thread(target=add_stock_order, args=(result.id,))
thread1.start()
thread2.start()

thread1.join()
thread2.join()
result.refresh_from_db()
print(result.order_quantity)

 

      - 결과 :

         - 같은 동작을 여러번 수행해봐도 제대로 된 결과가 나오지 않는다. 오류가 매번 발생된다.

         - 이 경우 race condition이 발생한다고 예상할 수 있다.

1800034

 

      - postgresql 함수를 사용한다.

      - 테스트 코드는 race_condition_test.ipynb로 github 저장소에 있다.

      - 테스트 코드

def call_update_stock(id, number):
    for _ in range(30000):
        with connection.cursor() as cursor:
            sql = f"SELECT update_quantity({id}, {number})"
            cursor.execute(sql)
            result = cursor.fetchone()  # 단일 행 결과를 가져옴
            # result = cursor.fetchall()  # 모든 행 결과를 가져옴
    
            # 쿼리 결과를 확인하고 처리할 내용을 추가로 작성하세요.
            if result is not None:
                # 쿼리 결과가 존재하는 경우에 대한 처리
                # result 변수에 반환된 결과가 저장됩니다.
                # 원하는 작업을 수행하세요.
                #print(result)
                pass
            else:
                # 쿼리 결과가 없는 경우에 대한 처리
                # 예외 상황 등에 대한 처리 코드를 작성하세요.
                print("쿼리 결과가 없습니다.")
def multi_thread_t2():
    threads = []
    for i in range(60):
        thread = threading.Thread(target=call_update_stock, args=(result.id,1,))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    result.refresh_from_db()
    print(result.order_quantity)

execution_time = timeit.timeit(multi_thread_t2,number=1)

print(f'함수 실행 시간: {execution_time}초')

 

 

      - 결과 :

         - 컴퓨터 자원에서 생성 가능한 최대 쓰레드 수와 쿼리 요청 수를 증가시켜도 제대로 된 결과가 반환된다.

         - deadlock이 생기지만, db는 idle한 상태로 확인되고 소프트웨어에서만 문제가 생겼다.

1800000
함수 실행 시간: 358.2501574999915초

 

   4) 프로시저 lock을 실행하고 race condition test 실행

      - 30000개의 쿼리를 요청하는 80개 이상의 쓰레드를 만들어 실행했다.

         - amd 5900x, amd 3500 두 cpu에서 테스트를 했으며 각각 윈도우, 우분투 환경이다.

         - cpu 사용률이 두 경우 모두 절반 수준을 유지했다. dbeaver에서 select를 이용해 데이터의 업데이트를 계속 확인하였다. 쓰레드를 너무 많이 생성하여 발생하는 소프트웨어의 deadlock이 발생하는 경우가 있었지만, postgresql에서 lock이나 active 상태의 테이블을 확인할 수 없었다.

         - query를 직접 사용하여 강제로 발생시킬 수 있지만, 내가 원한건 django & python의 소프트웨어 요청에 있어 lock을 사용할 여지가 있을까였다. 

 

      - 결론 :

         - lock에 대한 사용 예시는 위에서 설명했기에 현재 상에서 더 이상 테스트를 할 수 없어 종료한다.

 

 3. 테스트 결론

   - django에서 몇가지 lock의 기능을 제공하는 방법을 제공한다. 하지만 제일 좋은 것은 프로시저 및 함수를 직접 database에 작성하는 것이다.

   - redis를 사용하는 경우, 1:1 연결만 이루어 진다면 하나의 쓰래드로만 동작하는 redis에 lock은 따로 필요 없다. 하지만 분산 시스템이 구축되는 경우는 따로 고려해야 한다.

      - 하나의 redis 서버에 여러 수의 client가 연결을 시도하면 동시 접속에 의한 문제가 생길 수 있다.

   - 많은 문서를 통해 얻은 결론은, 한가지 방법으로 해결되는 문제가 아니다. 여러가지 방법을 복합적으로 사용하여 문제 발생의 확률을 최대한 낮추고 억제하는 것 뿐이다.

   - 제일 중요한 것은 설계이다. 설계를 잘하면 최소한의 기능으로 최대의 효율을 얻을 수 있다. 

 

 

 - reference :

https://bongra.tistory.com/52

 

[Postgres] Procedure(프로시저), Function(함수) 생성 및 사용

이번에 프로젝트를 하면서 프로시저 이야기가 나와서 한번 정리를 해보았다. 오라클을 공부할 때는 프로시저를 많이 들어봤었는데, Postgres에서는 프로시저에 대한 내용을 찾을 수 없어 구글링

bongra.tistory.com

https://a-proteur.tistory.com/58

 

저장 프로시저 (Stored Procedure, SP)

저장 프로시저 (Stored Procedure, SP) "저장 프로시저 또는 스토어드 프로시저(stored procedure)는 일련의 쿼리를 마치 하나의 함수처럼 실행하기 위한 쿼리의 집합이다." _위키백과 "데이터베이스에 대한

a-proteur.tistory.com

https://blog.gaerae.com/2015/10/postgresql-insert-update-returning.html

 

PostgreSQL: INSERT, UPDATE, DELETE 실행 결과 리턴 받기 (WHEN / RETURNING)

 

blog.gaerae.com

https://thebook.io/006696/0549/

 

오라클 SQL과 PL/SQL을 다루는 기술: RETURNING INTO 절을 이용한 디버깅

더북(TheBook): (주)도서출판 길벗에서 제공하는 IT 도서 열람 서비스입니다.

thebook.io