본문 바로가기
Python/Data Engineering

Ray를 활용한 Python 병렬 처리 하기 (feat. gpt api)

by JONGSKY 2023. 8. 22.
728x90
SMALL

1. 글을 작성하게 된 계기

 

- GPT API를 호출하는 경우가 많다.

- GPT API를 호출할 때 순차적으로 하면 너무 오래 걸린다.

- API 호출을 병렬로 처리해서 처리해 보자.

 

2. Ray 이해하기

 

2-1. Ray 구성 이해하기

 

1. Task

- 호출하는 곳과 다른 프로레스에서 실행되는 함수 또는 클래스

- @ray.remote 라는 데코레이터로 감싸면 그 함수는 task가 된다.

- Remote Function이라 부르며, 호출자와는 비동기적으로 실행

- remote()를 사용해서 호출 가능, ObjectRef 라는 값을 반환한다. 이때, ray.get(ObjectRef)를 하면 Task를 실행하고 값을 반환받을 수 있음.

 

2. Object

- Task를 통해 반환되거나 ray.put()을 통해 생성되는 값

- 데이터의 크기가 큰 경우 ray.put()을 통해 Object로 만들어 Ray에서 빠르게 사용 가능.

  다만 Object는 Spark의 RDD와 같이 Immutable(불변성)입니다.

 

3. Actor

- Stateful한 워커 프로세스입니다.

- @ray.remote를 통해서 Actor Class로 만들 수 있고, 이 클래스의 메서드 호출은 Stateful Task가 됩니다.

 

 

2-2. 코드 사용방법

 

import ray

@ray.mote
def load_txt_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
        return content
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        return None
    except Exception as e:
        print(f"An error occurred while loading the file: {e}")
        return None
        
ray.init()

result = load_txt_file.remote([file_path, file_path])
data = ray.get(result)

ray.shutdown()

 

 

1. import ray: Ray 프레임워크를 임포트하고 사용할 준비를 합니다.
2. @ray.remote: 이 데코레이터는 함수를 Ray를 이용하여 원격으로 실행 가능한 함수로 만듭니다. 이렇게 만들어진 함수는 병렬 및 분산 작업을 수행하기 위해 여러 워커(worker)에서 실행될 수 있습니다.
3. def load_txt_file(file_path): file_path라는 하나의 인자를 받는 함수를 정의합니다. 이 함수는 텍스트 파일을 로드하는 작업을 수행합니다.
4. try:: 예외 처리를 시작합니다. 코드 블록 안에서 예외가 발생할 수 있는 부분을 포함시킵니다.
5. with open(file_path, 'r', encoding='utf-8') as file:: 주어진 파일 경로(file_path)를 사용하여 파일을 엽니다. with 문을 사용하면 파일 작업이 끝나면 자동으로 파일을 닫을 수 있습니다. 파일은 읽기 모드('r')로 열리며, UTF-8 인코딩으로 읽습니다.
6. content = file.read(): 파일 내용을 읽어 변수 content에 저장합니다.
7. return content: 파일의 내용을 반환합니다.
8. except FileNotFoundError:: 파일을 찾지 못한 경우에 대한 예외 처리입니다. 파일이 존재하지 않을 때 해당 블록이 실행되며, 에러 메시지를 출력하고 None을 반환합니다.
9. except Exception as e:: 그 외의 모든 예외 상황에 대한 예외 처리입니다. 어떤 예외가 발생하든 해당 블록이 실행되며, 발생한 예외에 대한 메시지를 출력하고 마찬가지로 None을 반환합니다.
10. ray.init(): Ray 클러스터를 초기화합니다. 이렇게 하면 Ray의 분산 기능을 활용할 수 있습니다.
11. result = load_txt_file.remote([file_path, file_path]): 원격으로 실행 가능한 load_txt_file 함수를 호출합니다. .remote() 메서드를 사용하여 원격 함수 호출을 지시하며, 두 개의 동일한 file_path 인자를 리스트로 묶어 전달하고 있습니다.
12. data = ray.get(result): 원격 함수 호출 결과를 가져와서 data 변수에 저장합니다.
13. ray.shutdown(): Ray 클러스터를 종료합니다. Ray 작업이 끝나면 클러스터를 종료하는 것이 좋습니다.

 

 

GPT API에 적용한 결과

API를 활용해 Ray를 사용했지만, 적은 데이터에 대해서는 병렬을 사용했다고 해서 큰 차이가 없었다.

오히려 더 느려진 느낌을 받았다.

아마 OPENAPI 자체에서 API 요청을 받았을 때 처리하는 방식이 있지 않을까 하는 생각이 든다.

그리고 데이터가 커지면 당연하게 분당 token이 많아져서 limit 에러를 보인다.

728x90
LIST