[udemy] Web Scraping with BeautifulSoup, Selenium, Scrapy and Scrapy-Playwright. 4 Project-like Exercises + 4 Real Life Projects 학습 정리
강의 정보 : https://www.udemy.com/course/complete-python-web-scraping-real-projects-modern-tools/
github 정보 : https://github.com/AlpBCan/Web-Scraping-Course-Content
1. INTRODUCTION
1) Checking If the Website is Static or Dynamic
1. 개발자 도구를 열고 network 탭에서 disable cache를 체크한다.
2. Ctrl +Shift + P을 눌러 명령어를 열고 disable javascript를 입력한다.
3. 이후 reload 버튼을 눌렀을 때, 화면이 출력되지 않으면 동적 페이지이다.
2. BEAUTIFUL SOUP and REQUESTS 1 - BASICS
1) url의 페이지별 책 정보 가져오기
- https://books.toscrape.com/ 사이트를 방문하여 페이지를 넘어가면서 각 페이지의 책 정보를 엑셀 파일에 저장하는 프로젝트이다.
- book_dict으로 pandas의 데이터프레임으로 변환할 딕셔너리 양식을 미리 만들고 각 항목에 파싱한 정보를 append하여 나중에 엑셀의 열로 저장될 항목들에 저장해 나간다.
- 모든 for이 종료되면, 최종적으로 pd.DataFrame를 사용해 데이터프레임을 만들고 이를 엑셀로 저장한다.
- mina.py
import requests
from bs4 import BeautifulSoup
import pandas as pd
import openpyxl
with open('image_links.txt', 'w') as f:
pass
book_dict = {
'name': [],
'price': [],
'category': [],
'stars': [],
'upc': [],
'availability': [],
'in_stock': [],
'image_link': []
}
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36'}
number_dict = {'One': '1', 'Two': '2', 'Three': '3', 'Four': '4', 'Five': '5'}
url = 'https://books.toscrape.com/'
response = requests.get(url, headers=headers)
page_html = response.text
soup = BeautifulSoup(page_html, 'html.parser')
page_count_string = soup.find('li', class_='current').text
page_count = int(page_count_string.strip().split(' ')[-1])
for page_no in range(1, page_count + 1):
print(f'page --> {page_no}')
page_url = f'https://books.toscrape.com/catalogue/page-{page_no}.html'
response = requests.get(page_url, headers=headers)
page_html = response.text
soup = BeautifulSoup(page_html, 'html.parser')
books = soup.find_all('article', class_='product_pod')
for book in books:
book_url = 'https://books.toscrape.com/catalogue/' + book.find('a')['href']
response = requests.get(book_url, headers=headers)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
name = soup.find('div', class_='product_main').h1.text
book_dict['name'].append(name)
price = soup.find('div', class_='product_main').p.text
book_dict['price'].append(price)
ul_container = soup.find('ul', class_='breadcrumb')
li_items = ul_container.find_all('li')
category = li_items[2].a.text
book_dict['category'].append(category)
star_p_element = soup.find('p', class_='star-rating')
star_class_name_list = star_p_element['class']
star_string = star_class_name_list[1]
stars = number_dict[star_string]
book_dict['stars'].append(stars)
upc_th = soup.find('th', string='UPC')
upc = upc_th.find_next_sibling().text
book_dict['upc'].append(upc)
availability_th = soup.find('th', string='Availability')
availability = availability_th.find_next_sibling().text
book_dict['availability'].append(availability)
# In stock (22 available)
# 22 available)
in_stock = availability.split('(')[1].split(' ')[0]
book_dict['in_stock'].append(in_stock)
image_link = 'https://books.toscrape.com/' + soup.find('div', class_='thumbnail').img['src'][6:]
book_dict['image_link'].append(image_link)
with open('image_links.txt', 'a') as f:
f.write(image_link + '\n')
df = pd.DataFrame(book_dict)
df.to_excel('book.xlsx')
2) 하나의 북에 대한 디테일 정보 가져오기
- 엘리멘트를 접근하는 다양한 방법을 제시한다.
import requests
from bs4 import BeautifulSoup
number_dict = {'One': '1', 'Two': '2', 'Three': '3', 'Four': '4', 'Five': '5'}
book_url = 'https://books.toscrape.com/catalogue/a-light-in-the-attic_1000/index.html'
response = requests.get(book_url)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
# price, category, stars, upc, availability, in_stock, image_link
name = soup.find('div', class_='product_main').h1.text
print(name)
price = soup.find('div', class_='product_main').p.text
print(price)
ul_container = soup.find('ul', class_='breadcrumb')
li_items = ul_container.find_all('li')
category = li_items[2].a.text
print(category)
star_p_element = soup.find('p', class_='star-rating')
star_class_name_list = star_p_element['class']
star_string = star_class_name_list[1]
stars = number_dict[star_string]
print(stars)
upc_th = soup.find('th', string='UPC')
upc = upc_th.find_next_sibling().text
print(upc)
availability_th = soup.find('th', string='Availability')
availability = availability_th.find_next_sibling().text
print(availability)
# In stock (22 available)
# 22 available)
in_stock = availability.split('(')[1].split(' ')[0]
print(in_stock)
image_link = 'https://books.toscrape.com/' + soup.find('div', class_='thumbnail').img['src'][6:]
print(image_link)
3) 이미지 다운로드
- response = requests.get() 해당 코드에서 response는 메모리상으로 이미지 데이터를 가지고 있다.
- 때문에 파일을 write(), shutil.copyfileobj(), shutil.copy(), PIL.Image.open() 등을 사용하여 저장할 수 있다.
for i, image_url in enumerate(links_list):
response = requests.get(image_url, headers=headers)
with open(f"book_images/{i+1}.jpg", "wb") as imagefile:
imagefile.write(response.content)
4) 프록시 사용방법 - 일반 사용법
- 방법은 간단하다. requests.get() 파라미터에 proxies=proxies 정보를 정의하면 된다.
import requests
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'}
with open('proxy_list.txt', 'r') as p:
proxy_string = p.read()
proxy_list = proxy_string.split('\n')
for proxy in proxy_list:
my_proxy = proxy
proxy_url = f"http://{my_proxy}"
proxies = {
"http": proxy_url,
"https": proxy_url
}
url = "https://books.toscrape.com"
try:
response = requests.get(url, proxies=proxies, headers=headers)
if response.status_code == 200:
print(f"Request is successful. Proxy: {proxy}")
else:
print(f"Request failed. Proxy: {proxy} Status Code: {response.status_code}")
except Exception as e:
print(f"Request failed. Proxy: {proxy}. Error: {e}")
5) 프록시 사용방법 - 인증 사용법
- 차이가 있다면 proxy_url = f"http://{username}:{password}@{proxy}" 코드가 url이 된다는 것이다.
import requests
import username_password
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'}
with open('proxy_list.txt', 'r') as p:
proxy_string = p.read()
proxy_list = proxy_string.split('\n')
for proxy in proxy_list:
my_proxy = proxy
username = username_password.username
password = username_password.password
proxy_url = f"http://{username}:{password}@{proxy}"
proxies = {
"http": proxy_url,
"https": proxy_url
}
url = "https://books.toscrape.com"
try:
response = requests.get(url, proxies=proxies, headers=headers)
if response.status_code == 200:
print(f"Request is successful. Proxy: {proxy}")
else:
print(f"Request failed. Proxy: {proxy} Status Code: {response.status_code}")
except Exception as e:
print(f"Request failed. Proxy: {proxy}. Error: {e}")
2. SELENIUM 1 - BASICS
1) css selectors를 이용한 파싱 방법
1. 파싱만을 위해 기본으로 --headless를 설정한다.
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
url = 'https://books.toscrape.com/'
service = Service(ChromeDriverManager().install())
options = webdriver.ChromeOptions()
options.add_argument('--headless')
driver = webdriver.Chrome(service=service, options=options)
2) 기본 사용 방법
- 기본적으로 driver.get()으로 페이지를 가져오고 find_element() 혹은 find_elements()에 가져올 대상을 정의한다.
- 정의할 때, By 메소드를 사용하여 CSS_SELECTOR, ID 등을 정의할 수 있다.
driver.get(url)
list_items = driver.find_elements(By.CSS_SELECTOR, 'li')
print(len(list_items))
driver.get(url)
body = driver.find_element(By.CSS_SELECTOR, 'body#default')
body_ = driver.find_element(By.ID, 'default')
print(body_.text)
- 가져온 엘리먼트의 속성을 가져올 경우, 정의한 엘리먼트의 체이닝으로 get_attribute()를 정의한다.
driver.get(url)
img_src = driver.find_element(By.CSS_SELECTOR, 'article.product_pod div a img').get_attribute('src')
print(img_src)
3) XPATH 를 이용한 파싱 방법
- 기본 사용 방법
driver.get(url)
list_items = driver.find_elements(By.XPATH, '//li')
len(list_items)
driver.get(url)
prices = driver.find_elements(By.XPATH, '//p[@class="price_color"]')
for price in prices:
print(price.text)
print('\n')
4) 클래스 속성 가져오기
- 클래스 속성은 element() 의 두 번째 파라미터에 . 을 이용해 접근할 수 있다.
<div class="quote">
<span class="text">“The world as we have created it is a process of our thinking.
It cannot be changed without changing our thinking.”</span>
<span>by <small class="author">Albert Einstein</small></span>
<div class="tags">Tags:
<a class="tag">change</a>
<a class="tag">deep-thoughts</a>
<a class="tag">thinking</a>
<a class="tag">world</a>
</div>
</div>
- 위 html 구조에서 span에서 text 클래스를 가지고 있는 엘리먼트와 small 태그의 author 클래스를 가지고 있는 엘리먼트는 다음과 같은 방법으로 가져올 수 있다.
quote = element.find_element(By.CSS_SELECTOR, 'span.text').text
the_dict['quote'].append(quote)
author = element.find_element(By.CSS_SELECTOR, 'small.author').text
the_dict['author'].append(author)
5) CSS_SELECTOR 사용
1. 엘리먼트 선택 : 이름만 정의한다.
driver.find_elements(By.CSS_SELECTOR, 'a')
2. 클래스 선택 : .으로 정의한다.
driver.find_elements(By.CSS_SELECTOR, '.button')
3. ID 선택 : #으로 정의한다.
driver.find_element(By.CSS_SELECTOR, '#header')
4. 속성 선택 : [속성=값]으로 정의한다.
driver.find_element(By.CSS_SELECTOR, '[name="username"]')
5. 자식 선택 : 공백으로 구분하여 정의한다.
driver.find_elements(By.CSS_SELECTOR, 'ul li')
6. 가상 클래스 및 가상 엘리먼트 선택 : 가상 클래스 또는 가상 엘리먼트를 사용한다.
driver.find_element(By.CSS_SELECTOR, 'a:hover')
5) 무한 스크롤 제어
from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
url = 'https://quotes.toscrape.com/scroll'
options = webdriver.ChromeOptions()
options.add_argument('--start-maximized')
options.add_experimental_option('detach', True)
driver = webdriver.Chrome(options=options)
driver.get(url)
def scroll_to_bottom():
driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')
sleep(1)
old_height = 0
new_height = driver.execute_script('return document.body.scrollHeight')
while new_height != old_height:
scroll_to_bottom()
old_height = new_height
new_height = driver.execute_script('return document.body.scrollHeight')
quotes = driver.find_elements(By.CSS_SELECTOR, 'div.quote')
print(len(quotes))
6) 동적 페이지에서 특정 요소가 로드될 때까지 기다리는 방법
- WebDriverWait를 사용하면 요소가 로드될 때까지 기다릴 수 있다.
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from time import sleep
url = 'https://quotes.toscrape.com/js-delayed'
options = webdriver.ChromeOptions()
options.add_argument('--start-maximized')
options.add_experimental_option('detach', True)
driver = webdriver.Chrome(options=options)
driver.implicitly_wait(3)
driver.get(url)
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.quote'))
)
sleep(2)
quotes = driver.find_elements(By.CSS_SELECTOR, 'div.quote')
print(len(quotes))
7) 복잡한 사용자 상호작용을 자동화하기 위한 메소드 - ActionChains
- 프로젝트 코드
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
url = 'https://quotes.toscrape.com/js'
options = webdriver.ChromeOptions()
options.add_argument('--start-maximized')
options.add_experimental_option('detach', True)
driver = webdriver.Chrome(options=options)
actions = ActionChains(driver)
driver.get(url)
next_button = driver.find_element(By.CSS_SELECTOR, 'li.next a')
actions.move_to_element(next_button).perform()
next_button.click()
3. SELENIUM 3 - REAL LIFE EXAMPLE 2 (IMDB)
1) imdb 사이트를 크롤링 하기
1. 모든 스크립트가 종료되더라도 크라우저 창이 자동으로 닫히지 않게 하는 방법
options.add_experimental_option('detach', True)
2) 쿠키 활성화
1. 보통 Selenium을 사용하여 웹 페이지에 표시되는 쿠키 수락 배너에서 "수락" 버튼을 자동으로 클릭하는 사용자 정의 함수이다.
- 기본 예시
def accept_cookie(the_driver):
try:
accept_cookies = the_driver.find_element(By.XPATH, '//button[text()="Accept"]')
accept_cookies.click()
except:
pass
4. Scrapy1_Basics
1) 기본 사용 방법
1. 콘솔 명령어
1.1. scrapy startproject myproject
- 새 Scrapy 프로젝트를 생성한다.
- "myproject"라는 디렉토리와 함께 프로젝트에 필요한 기본 폴더(예: spiders, settings, items 등)와 파일들이 자동으로 생성된다.
1.2. scrapy genspider books.toscrape.com
- 새 스파이더(크롤러) 템플릿을 생성한다.
- "books.toscrape.com"이라는 이름의 스파이더 파일이 생성되며, 이 파일은 이후 해당 사이트를 크롤링하는 데 필요한 기본 코드와 구조를 포함한다.
1.3. scrapy view books.toscrape.com
- 지정한 URL(books.toscrape.com)의 페이지를 브라우저에서 확인할 수 있도록 연다.
- 이 명령어는 페이지의 HTML 소스나 렌더링 결과를 빠르게 확인하고 디버깅할 때 유용하다.
1.4. scrapy shell books.toscrape.com
- 해당 URL에 대해 Scrapy의 인터랙티브 셸을 실행한다.
- 이 셸을 통해 웹페이지의 응답(Response)을 가져와서 CSS 선택자, XPath 등으로 데이터 추출 방법을 실험하거나 디버깅할 수 있다.
2. CSS 선택자 사용법
2.1. 기본 사용법
- 구문: response.css('선택자')
- 선택자에 맞는 모든 요소를 선택하며, 결과는 SelectorList 객체이다.
- 예제:
# 모든 p 태그 중 class가 price_color인 요소 선택
elements = response.css('p.price_color')
2.2. 텍스트 추출
- 텍스트 추출: ::text
- 요소 내부의 텍스트 노드를 선택한다.
- 예제:
# price_color 클래스를 가진 p 태그의 텍스트만 추출
prices = response.css('p.price_color::text')
2.3. 속성(attribute) 추출
- 속성 추출: ::attr(속성명)
- HTML 요소의 특정 속성값을 직접 추출한다.
- 예제:
# a 태그의 href 속성을 추출
href_value = response.css('article.product_pod h3 a::attr(href)').get()
2.4. 주요 메소드
- .get()
- 선택된 결과 중 첫 번째 항목의 값을 문자열로 반환한다.
- 예:
first_price = response.css('p.price_color::text').get()
- .getall()
- 선택된 모든 항목을 리스트로 반환한다.
- 예:
all_prices = response.css('p.price_color::text').getall()
- .extract() / .extract_first()
- Scrapy 1.x 버전에서 사용되던 메소드로, 각각 getall()와 get()의 역할을 한다.
- 최신 버전에서는 get()와 getall()을 권장한다.
2.5. 선택자 객체의 속성 – .attrib
- .attrib
- 선택된 요소 객체의 속성 전체를 포함하는 딕셔너리이다.
- 즉, 선택자로 요소 자체를 반환한 후 .attrib를 통해 원하는 속성을 조회할 수 있다.
- 예제:
# 첫 번째 a 태그 객체에서 href 속성 값 추출
a_selector = response.css('article.product_pod h3 a')
href_value = a_selector.attrib['href']
3. XPath 선택자 사용법
3.1. 기본 사용법
- 구문: response.xpath('XPath 표현식')
- XPath 표현식을 사용하여 요소나 텍스트, 속성 등을 선택한다.
- 예제:
# class가 product_pod인 article 요소 내의 h3 > a 태그 선택
elements = response.xpath('//article[@class="product_pod"]/h3/a')
3.2. 텍스트 및 속성 추출
- 텍스트 추출:
- XPath에서 /text()를 사용해 텍스트 노드를 선택한다.
- 예제:
text_content = response.xpath('//p[@class="price_color"]/text()').get()
- 속성 추출:
- XPath에서 /@속성명으로 속성을 바로 선택할 수 있다.
- 예제:
href_value = response.xpath('//article[@class="product_pod"]/h3/a/@href').get()
3.3. 주요 메소드
- .get()
- 선택 결과 중 첫 번째 항목을 문자열로 반환한다.
- .getall()
- 선택 결과 전체를 리스트로 반환한다.
- .extract() / .extract_first()
- 이전 버전에서 사용되던 메소드로, 각각 getall()와 get()과 동일한 역할을 한다.
3.4. 선택자 객체의 속성 - .attrib
- XPath로 선택한 요소 객체도 .attrib 속성을 가진다.
- 예제:
a_selector = response.xpath('//article[@class="product_pod"]/h3/a')
href_value = a_selector.attrib['href']
4. attr와 attrib의 차이점
4.1. ::attr(속성명)
- CSS 선택자 내에 사용되는 가상 선택자(pseudo-element)이다.
- 선택 시 해당 속성의 값을 바로 추출하여 문자열(또는 리스트)로 반환한다.
- 예: response.css('a::attr(href)').get()
4.2. .attrib
- 선택된 Selector 객체의 Python 딕셔너리 속성이다.
- 요소에 포함된 모든 속성이 key-value 형태로 저장되어 있으며, 원하는 속성을 키를 통해 접근한다.
- 예: response.css('a').attrib['href']
4.3. 주요 차이점 요약:
방법 | 반환 형태 | 사용 시점 |
::attr('속성명') | 문자열 또는 리스트 | 선택자 내부에서 바로 속성 추출 |
.attrib['속성명'] | 문자열 | 요소 전체가 선택된 후 속성 딕셔너리에서 추출 |
5. 기타 관련 메소드 및 옵션
5.1 정규 표현식 적용 – re() 메소드
- .re() 메소드:
- 선택자 결과에 대해 정규 표현식을 적용하여 원하는 패턴의 데이터를 추출한다.
- 예제:
# price_color 클래스의 텍스트에서 숫자와 소수점만 추출 (예: '£51.77' → '51.77')
prices = response.css('p.price_color::text').re(r'\d+\.\d+')
5.2 선택자 객체 관련 기타 메소드
- .extract() / .extract_first()
- 앞서 언급했듯이, Scrapy 1.x에서 사용되던 메소드로 getall()와 get()과 동일한 역할을 한다.
- 현재 버전에서는 get()과 getall() 사용을 권장한다.
- .root
- 선택자 객체 내부의 원본 lxml 요소를 반환한다.
- 고급 사용자 및 디버깅 시 활용할 수 있다.
- 예제:
first_element_root = response.css('div.content').root
5.3 옵션 및 기타 팁
- 선택자 결합:
- CSS나 XPath를 혼합하여 사용할 수는 없으나, 두 가지 방법 중 편리한 것을 선택하여 사용할 수 있다.
- 디버깅:
- Scrapy 셸(scrapy shell [URL])을 활용하면 선택자 결과를 실시간으로 확인하며 개발할 수 있다.
- 예외 처리:
- .get()을 사용할 때 원하는 요소가 없으면 None을 반환하므로, 후속 처리 시 이를 고려해야 한다.
- 체이닝:
- 선택자를 체이닝하여 더욱 구체적인 요소를 선택할 수 있다.
- 예제:
# article 내의 a 태그의 텍스트 추출
link_text = response.css('article.product_pod').css('h3 a::text').get()
6. 샘플 코드 전체 예제
- 아래는 CSS와 XPath 선택자를 모두 활용한 간단한 예제 코드입니다.
# Scrapy 셸 또는 스파이더 내에서 실행
# CSS 선택자 예제
# 1. 가격 텍스트를 리스트로 추출
prices_css = response.css('p.price_color::text').getall()
print("CSS 가격 리스트:", prices_css)
# 2. 첫 번째 가격 텍스트 추출
first_price_css = response.css('p.price_color::text').get()
print("CSS 첫 번째 가격:", first_price_css)
# 3. a 태그의 href 속성 추출 (두 가지 방법)
href_css_method1 = response.css('article.product_pod h3 a::attr(href)').get()
href_css_method2 = response.css('article.product_pod h3 a').attrib['href']
print("CSS href (방법1):", href_css_method1)
print("CSS href (방법2):", href_css_method2)
# XPath 선택자 예제
# 1. 가격 텍스트 추출
first_price_xpath = response.xpath('//p[@class="price_color"]/text()').get()
print("XPath 첫 번째 가격:", first_price_xpath)
# 2. a 태그의 href 속성 추출 (두 가지 방법)
href_xpath_method1 = response.xpath('//article[@class="product_pod"]/h3/a/@href').get()
href_xpath_method2 = response.xpath('//article[@class="product_pod"]/h3/a').attrib['href']
print("XPath href (방법1):", href_xpath_method1)
print("XPath href (방법2):", href_xpath_method2)
# 정규 표현식 적용 예제
# 가격 텍스트에서 숫자만 추출
price_numbers = response.css('p.price_color::text').re(r'\d+\.\d+')
print("정규 표현식으로 추출한 가격:", price_numbers)
2) spider
1. 스파이더 개요
- 이 스파이더는 Scrapy 프레임워크를 사용하여 Books to Scrape 사이트에서 책 정보를 수집한다.
- 주요 작업은 다음과 같다.
- 시작 URL에서 요청:
- 스파이더는 시작 URL인 https://books.toscrape.com에서 크롤링을 시작한다.
- 목록 페이지 처리:
- 페이지에서 각 책의 링크를 추출한 후, 각 링크를 방문하여 상세 정보를 수집한다.
- 페이징 처리:
- 목록 페이지 하단에 있는 "다음 페이지" 링크를 찾아 재귀적으로 다음 페이지도 처리한다.
- 상세 페이지 처리:
- 각 책의 상세 페이지에서 제목, 가격(세금 포함/미포함), UPC, 재고 정보, 세금, 카테고리, 별점, 이미지 URL 등을 추출한다.
- 별점 처리:
- 별점은 CSS 클래스(star-rating)에 포함되어 있으며, 이를 단어(예: "One", "Two")로 받아 숫자 형태(예: "1", "2")로 변환한다.
- 데이터 저장: 수집한 정보를 BookItem 객체에 담아 다음 단계로 전달한다.
import scrapy
from ..items import BookItem
# 별점을 문자열(영어 단어)에서 숫자로 매핑하기 위한 딕셔너리
number_dict = {'One': '1', 'Two': '2', 'Three': '3', 'Four': '4', 'Five': '5'}
class BookSpider(scrapy.Spider):
# 스파이더의 이름을 정의 (실행 시 이름으로 호출)
name = "book"
# 크롤링 허용 도메인 설정
allowed_domains = ["books.toscrape.com"]
# 크롤링 시작 URL 목록
start_urls = ["https://books.toscrape.com"]
def parse(self, response):
"""
메인 페이지나 목록 페이지에서 책 상세 페이지 링크를 추출하고,
'다음 페이지'가 있으면 재귀적으로 요청을 보내는 함수.
"""
# 각 책의 상세 페이지로 연결되는 링크 추출 (CSS 선택자 사용)
book_links = response.css('article.product_pod h3 a::attr(href)').getall()
for book_link in book_links:
# 각각의 책 상세 페이지 링크로 follow 요청, 응답은 parse_books에서 처리
yield response.follow(book_link, callback=self.parse_books)
# '다음 페이지' 링크 추출
next_page_link = response.css('li.next a::attr(href)').get()
# 다음 페이지가 존재하면 다시 parse 함수로 요청하여 페이지 반복 처리
if next_page_link is not None:
yield response.follow(next_page_link, callback=self.parse)
def parse_books(self, response):
"""
책 상세 페이지에서 필요한 정보를 추출하는 함수.
"""
# BookItem 객체를 생성하여 데이터를 저장할 준비
book = BookItem()
# 책 제목 추출
book['name'] = response.css('div.product_main h1::text').get()
# 세금 제외 가격 추출 (XPath 사용)
book['price_exc_tax'] = response.xpath('//th[text()="Price (excl. tax)"]/following-sibling::td/text()').get()
# 세금 포함 가격 추출 (XPath 사용)
book['price_inc_tax'] = response.xpath('//th[text()="Price (incl. tax)"]/following-sibling::td/text()').get()
# UPC(고유 상품 코드) 추출
book['upc'] = response.xpath('//th[text()="UPC"]/following-sibling::td/text()').get()
# 재고 정보(Availability) 추출
book['availability'] = response.xpath('//th[text()="Availability"]/following-sibling::td/text()').get()
# 세금 정보 추출
book['tax'] = response.xpath('//th[text()="Tax"]/following-sibling::td/text()').get()
# 카테고리 추출: breadcrumb 내의 두 번째 링크(실제로는 세 번째 항목)를 사용
category_children = response.xpath('//ul[@class="breadcrumb"]/child::*')
book['category'] = category_children[2].css('a::text').get()
# 별점 추출: <p class="star-rating Three"> 형식의 태그에서 별점 정보를 얻음
star_tag = response.css('p.star-rating')
class_name_string = star_tag.attrib['class']
# 공백으로 구분된 클래스 이름 중 마지막 항목이 별점을 의미 (예: "Three")
stars = class_name_string.split(' ')[-1]
# 딕셔너리를 통해 숫자로 변환하여 저장
book['stars'] = number_dict[stars]
# 이미지 URL 추출: 상대경로를 절대경로로 변환하기 위해 앞부분의 URL을 추가
# 이미지 태그의 src 속성은 "../../media/cache/..." 형태이므로 [5:]로 "../.." 부분을 제거
book['image_url'] = 'https://books.toscrape.com' + response.css('div.active img').attrib['src'][5:]
# 최종적으로 수집한 데이터를 yield하여 파이프라인 등으로 전달
yield book
3) items
1. 주요 특징
- 구조화된 데이터 저장
- Items를 사용하면 크롤링할 데이터의 필드를 미리 정의할 수 있다.
- 예를 들어, 도서 정보라면 제목, 가격, 재고 등 필요한 항목들을 선언해두어 일관성 있는 데이터 수집이 가능하다.
- 타입 안정성 및 명세 제공
- Items는 클래스 형태로 정의되므로, 각 필드에 대해 명시적으로 scrapy.Field()를 사용하여 데이터 구조를 문서화할 수 있다.
- 파이프라인과의 통합
- Scrapy의 아이템 파이프라인은 수집한 Item 객체를 받아 다양한 후처리(클렌징, 저장, 변환 등)를 적용할 수 있다.
- 이를 통해 데이터가 최종 저장소(예: 데이터베이스, 파일 등)로 전송되기 전 일련의 처리가 가능해진다.
- 유연성
- 기본적으로 딕셔너리와 유사하게 동작하기 때문에 익숙한 방식으로 접근할 수 있으며, 필요한 경우 추가 메타데이터나 유효성 검사를 위한 커스텀 로직을 구현할 수도 있다.
2. Items 정의 예시
- 보통 items.py 파일에 Items 클래스를 정의한다. 예를 들어, 도서 정보를 저장할 Item은 아래와 같이 작성할 수 있다.
import scrapy
# 도서 정보를 저장하기 위한 Item 클래스
class BookItem(scrapy.Item):
# 도서 제목을 저장하는 필드
title = scrapy.Field()
# 가격 정보를 저장하는 필드
price = scrapy.Field()
# 재고 상태를 저장하는 필드
availability = scrapy.Field()
# 기타 필요한 필드들을 추가로 정의 가능
4) pipelines
1. Scrapy Pipelines 개요
- Scrapy의 파이프라인은 스파이더가 수집한 Item 데이터를 후처리하는 일련의 컴포넌트이다.
- 파이프라인은 다음과 같은 역할을 한다.
- 데이터 정제 및 가공:
- 불필요한 공백 제거, 데이터 형식 변환, 대문자/소문자 변환 등 데이터를 정리한다.
- 유효성 검사:
- 수집된 데이터가 요구 조건에 부합하는지 확인하고, 조건에 맞지 않으면 해당 아이템을 제거할 수 있다.
- 저장 및 내보내기:
- 데이터를 파일(Excel, CSV 등)이나 데이터베이스(SQLite, MySQL 등)에 저장한다.
- 여러 파이프라인 체인:
- 설정에서 여러 파이프라인을 순차적으로 적용할 수 있으며, 각 파이프라인은 순서대로 아이템을 처리한다.
- 각 파이프라인 클래스는 process_item(self, item, spider) 메서드를 구현하며, 아이템을 가공한 후 반환하거나, DropItem 예외를 발생시켜 아이템을 제거할 수 있다. 또한, open_spider와 close_spider 메서드를 활용해 스파이더 시작 시와 종료 시 초기화 작업 및 정리 작업을 수행할 수 있다.
2. 코드 상세 설명 및 주석
- 다음 코드는 총 4개의 파이프라인 클래스를 정의한다. 각 파이프라인은 수집한 도서(Item)를 다르게 처리한다.
2.1. BasicsPipeline
- 아이템의 기본적인 데이터를 정제하는 파이프라인이다.
class BasicsPipeline:
def process_item(self, item, spider):
# ItemAdapter를 사용해 item에 접근하면 dict처럼 다룰 수 있음
adapter = ItemAdapter(item)
# 'name' 필드를 모두 대문자로 변환
adapter['name'] = adapter['name'].upper()
# 'availability' 필드에서 괄호 안의 숫자(재고 개수)를 추출
available_no = adapter.get('availability').split('(')[-1].split(' ')[0]
adapter['availability'] = available_no
# 가격 관련 필드의 화폐 단위를 파운드(£)에서 달러($)로 변경
adapter['price_exc_tax'] = adapter.get('price_exc_tax').replace('£', '$')
adapter['price_inc_tax'] = adapter.get('price_inc_tax').replace('£', '$')
adapter['tax'] = adapter.get('tax').replace('£', '$')
# 가공된 아이템을 다음 파이프라인이나 최종 저장 단계로 반환
return item
2.2. 주요 포인트:
- ItemAdapter를 사용해 아이템에 안전하게 접근한다.
- 이름을 대문자로 변환하여 일관성을 부여한다.
- 재고 정보를 문자열 파싱하여 순수한 숫자 정보만 남긴다.
- 가격 정보에서 화폐 기호를 변경하여 통일성을 유지한다.
3. DropperPipeline
- 재고(stock)가 충분하지 않은 아이템을 필터링(제거)하는 파이프라인이다.
class DropperPipeline:
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# availability에서 재고 숫자 추출
available_no = adapter.get('availability').split('(')[-1].split(' ')[0]
# 재고가 10 이상인 경우에만 아이템을 반환하고, 그렇지 않으면 DropItem 예외 발생
if int(available_no) >= 10:
# 재고가 충분하면, 기본적인 가격 단위 변경을 수행
adapter['availability'] = available_no
adapter['price_exc_tax'] = adapter.get('price_exc_tax').replace('£', '$')
adapter['price_inc_tax'] = adapter.get('price_inc_tax').replace('£', '$')
adapter['tax'] = adapter.get('tax').replace('£', '$')
return item
else:
# 재고 부족 시 해당 아이템을 파이프라인에서 제거
raise DropItem(f'Not enough stock for {adapter.get("name")}')
3.1. 주요 포인트:
- 재고 수치를 정수로 변환하여 재고가 10 이상인 경우에만 아이템을 통과시킨다.
- 재고가 부족한 아이템은 DropItem 예외를 발생시켜 제거한다.
4. ExcelPipeline
- 아이템을 Excel 파일로 저장하는 파이프라인이다.
class ExcelPipeline:
def open_spider(self, spider):
# 스파이더 시작 시, 새로운 Excel 워크북 생성
self.workbook = openpyxl.Workbook()
self.sheet = self.workbook.active
self.sheet.title = 'Books'
# Excel 파일의 첫 번째 행에 컬럼 이름을 추가
self.sheet.append(["name", "price_exc_tax", "price_inc_tax",
"category", "stars", "upc", "tax", "availability",
"image_url"])
def process_item(self, item, spider):
# 각 아이템의 필드를 Excel의 새로운 행에 추가
self.sheet.append([item.get('name'),
item.get('price_exc_tax'),
item.get('price_inc_tax'),
item.get('category'),
item.get('stars'),
item.get('upc'),
item.get('tax'),
item.get('availability'),
item.get('image_url'),
])
return item
def close_spider(self, spider):
# 스파이더 종료 시, Excel 파일 저장
self.workbook.save('books_.xlsx')
4.1. 주요 포인트:
- open_spider: 스파이더 시작 시 Excel 워크북 및 시트를 생성하고, 헤더(컬럼 이름)를 추가한다.
- process_item: 각 아이템의 데이터를 Excel 시트에 행(row)으로 추가한다.
- close_spider: 스파이더 종료 시 Excel 파일을 저장한다.
5. SQLitePipeline
- 아이템을 SQLite 데이터베이스에 저장하는 파이프라인이다.
class SQLitePipeline:
def open_spider(self, spider):
# 스파이더 시작 시, SQLite 데이터베이스에 연결
self.connection = sqlite3.connect("bookdatabase.db")
self.cursor = self.connection.cursor()
# 데이터베이스에 테이블이 존재하지 않으면 새 테이블 생성
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS booktable (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
price_exc_tax TEXT,
price_inc_tax TEXT,
category TEXT,
stars TEXT,
upc TEXT,
tax TEXT,
availability TEXT,
image_url TEXT
)
''')
self.connection.commit()
def process_item(self, item, spider):
# 아이템 데이터를 데이터베이스 테이블에 삽입
self.cursor.execute('''
INSERT INTO booktable (
name, price_exc_tax, price_inc_tax, category, stars, upc, tax, availability, image_url
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
item['name'],
item['price_exc_tax'],
item['price_inc_tax'],
item['category'],
item['stars'],
item['upc'],
item['tax'],
item['availability'],
item['image_url']
))
self.connection.commit()
return item
def close_spider(self, spider):
# 스파이더 종료 시 데이터베이스 연결을 종료
self.connection.close()
5.1. 주요 포인트:
- open_spider: SQLite 데이터베이스에 연결하고, 저장할 테이블(booktable)이 없으면 생성한다.
- process_item: 각 아이템을 데이터베이스에 INSERT 쿼리를 사용하여 저장한다.
- close_spider: 스파이더가 종료될 때 데이터베이스 연결을 닫아 자원을 정리한다.
5) middleware
1. 코드 원본
from scrapy import signals
import random
# 다양한 아이템 타입을 하나의 인터페이스로 다루기 위해 사용 (현재 코드에서는 사용하지 않지만, 확장성을 위해 임포트)
from itemadapter import is_item, ItemAdapter
# 사용자 에이전트와 프록시를 함께 회전시키는 미들웨어 클래스
class RotateUserAgentAndProxyMiddleware:
def __init__(self, user_agents, proxies):
# 생성자: 설정 파일에서 받아온 사용자 에이전트와 프록시 리스트를 인스턴스 변수에 저장
self.user_agents = user_agents
self.proxies = proxies
@classmethod
def from_crawler(cls, crawler):
# Scrapy가 미들웨어 인스턴스를 생성할 때 호출하는 클래스 메서드
# 설정 파일에서 'USER_AGENT_LIST'와 'PROXY_LIST'를 가져와서 인스턴스를 생성합니다.
user_agents = crawler.settings.getlist('USER_AGENT_LIST')
proxies = crawler.settings.getlist('PROXY_LIST')
return cls(user_agents, proxies)
def process_request(self, request, spider):
# 각 요청(Request)이 다운로더로 전달되기 전에 호출되는 메서드.
# 랜덤하게 사용자 에이전트와 프록시를 선택하여 요청의 헤더와 메타정보에 적용합니다.
user_agent = random.choice(self.user_agents)
proxy = random.choice(self.proxies)
request.headers['User-Agent'] = user_agent # 요청 헤더에 User-Agent 설정
request.meta['proxy'] = proxy # 요청 메타에 프록시 설정
spider.logger.info(f'Using user-agent: {user_agent}, proxy: {proxy}')
def process_response(self, request, response, spider):
# 다운로더에서 받은 응답(Response)이 스파이더로 전달되기 전에 호출되는 메서드.
# 응답 상태 코드가 200이 아닌 경우, 새로운 사용자 에이전트와 프록시로 요청을 재시도합니다.
if response.status != 200:
user_agent = random.choice(self.user_agents)
proxy = random.choice(self.proxies)
spider.logger.info(f'Changing user-agent to: {user_agent}, proxy to: {proxy}, retrying...')
new_request = request.copy() # 기존 요청을 복사하여 재시도할 새 요청 생성
new_request.headers['User-Agent'] = user_agent
new_request.meta['proxy'] = proxy
new_request.dont_filter = True # 중복 요청 필터링을 피하기 위해 설정
return new_request # 새 요청 반환: 이 경우 응답은 사용되지 않음
return response # 상태 코드가 200이면 기존 응답을 그대로 반환
def process_exception(self, request, exception, spider):
# 요청 처리 중 예외가 발생했을 때 호출되는 메서드.
# 예외 발생 시, 새로운 사용자 에이전트와 프록시로 요청을 재시도하도록 합니다.
user_agent = random.choice(self.user_agents)
proxy = random.choice(self.proxies)
spider.logger.info(f'Exception: {exception}. Changing user-agent to: {user_agent}, proxy to: {proxy}, retrying...')
new_request = request.copy() # 기존 요청 복사
new_request.headers['User-Agent'] = user_agent
new_request.meta['proxy'] = proxy
new_request.dont_filter = True # 중복 요청 필터링 방지
return new_request # 새로운 요청을 반환하여 재시도하도록 함
# 사용자 에이전트만 회전시키는 미들웨어 클래스
class RotateUserAgentMiddleware:
def __init__(self, user_agents):
# 생성자: 설정 파일에서 받아온 사용자 에이전트 리스트를 저장합니다.
self.user_agents = user_agents
@classmethod
def from_crawler(cls, crawler):
# Scrapy가 미들웨어 인스턴스를 생성할 때 호출하는 클래스 메서드.
# 설정 파일에서 'USER_AGENT_LIST'를 가져와 인스턴스를 생성합니다.
user_agents = crawler.settings.getlist('USER_AGENT_LIST')
return cls(user_agents)
def process_request(self, request, spider):
# 각 요청(Request)이 다운로더로 전달되기 전에 호출되는 메서드.
# 랜덤하게 사용자 에이전트를 선택하여 요청 헤더에 적용합니다.
user_agent = random.choice(self.user_agents)
request.headers['User-Agent'] = user_agent # 요청 헤더에 선택된 사용자 에이전트 적용
spider.logger.info(f'Using user-agent: {user_agent}')
2. 전체 동작 설명
2.1. RotateUserAgentAndProxyMiddleware
- 초기화 및 설정
- from_crawler 메서드를 통해 Scrapy 설정 파일에서 USER_AGENT_LIST와 PROXY_LIST를 가져와 미들웨어 인스턴스를 생성한다.
- 이로써 여러 개의 사용자 에이전트와 프록시가 리스트 형태로 저장된다.
- 요청 전처리 (process_request)
- 각 요청이 다운로더로 전달되기 전에 호출된다.
- 미들웨어는 저장된 리스트에서 랜덤으로 사용자 에이전트와 프록시를 선택한다.
- 선택된 사용자 에이전트는 request.headers에, 프록시는 request.meta['proxy']에 할당된다.
- 이러면 매 요청마다 다른 사용자 에이전트와 프록시를 사용하여, 서버로부터 차단당할 위험을 줄일 수 있다.
- 응답 처리 (process_response)
- 다운로더가 응답을 받은 후, 스파이더로 전달되기 전에 호출된다.
- 만약 응답의 상태 코드가 200(정상)이 아니면, 미들웨어는 새로운 사용자 에이전트와 프록시를 선택하고, 기존 요청을 복사하여 새로운 요청을 생성한다.
- 새 요청에 대해 dont_filter = True를 설정하여, Scrapy의 중복 필터링을 우회한 후 다시 시도하도록 한다.
- 정상 응답일 경우, 응답을 그대로 반환한다.
- 예외 처리 (process_exception)
- 요청 처리 도중 예외가 발생했을 때 호출된다.
- 예외 발생시, 새로운 사용자 에이전트와 프록시를 선택하고, 기존 요청을 복사하여 새로운 요청으로 대체한다.
- 이로써 예외 상황에서도 요청을 재시도할 수 있도록 처리한다.
2.2. RotateUserAgentMiddleware
- 초기화 및 설정
- from_crawler를 통해 Scrapy 설정 파일에서 USER_AGENT_LIST를 가져와 미들웨어 인스턴스를 생성한다.
- 프록시 설정은 포함하지 않고 사용자 에이전트 리스트만 저장한다.
- 요청 전처리 (process_request)
- 각 요청이 다운로더로 전달되기 전에 호출된다.
- 저장된 사용자 에이전트 리스트에서 랜덤하게 하나를 선택하여, 요청 헤더의 User-Agent 값으로 설정한다.
- 이 미들웨어는 프록시 처리를 하지 않으므로, 단순히 사용자 에이전트만 회전시킨다.
5. SCRAPY 3 - REAL LIFE EXAMPLE 3 (FLYING TIGER)
1) 프로젝트 구조
Scrapy3_RealLifeProject3/
├── scrapy_real_life_1/
│ ├── tiger/
│ │ ├── tiger/
│ │ │ ├── __init__.py
│ │ │ ├── items.py
│ │ │ ├── pipelines.py
│ │ │ └── spiders/
│ │ │ ├── __init__.py
│ │ │ └── images.py
│ │ └── scrapy.cfg
│ └── ...
└── ...
2) spiders
1. images.py
import scrapy
from ..items import TigerItem, ImageItem # items 폴더에서 정의한 두 가지 아이템(TigerItem, ImageItem)을 임포트
class ImageSpider(scrapy.Spider):
# 스파이더 이름 정의 (실행 시 이 이름으로 호출)
name = "image"
# 스파이더가 접근 가능한 도메인 리스트. 크롤링 시 이 도메인들 외에는 접근하지 않음.
allowed_domains = ["flyingtiger.com"]
# 스파이더가 크롤링을 시작할 URL 리스트
start_urls = ["https://flyingtiger.com/collections/shop-all"]
# 사용자 정의 설정: 아이템 파이프라인과 이미지 저장 경로 설정
custom_settings = {
'ITEM_PIPELINES': {
'tiger.pipelines.JsonWriterPipeline': 10, # JSON 파일로 저장하는 파이프라인 우선순위 10
'tiger.pipelines.CustomImagesPipeline': 20, # 이미지 다운로드 파이프라인 우선순위 20
},
'IMAGES_STORE': 'tiger_images' # 다운로드 받은 이미지를 저장할 로컬 디렉토리 경로
}
def parse(self, response):
"""
start_urls에서 받은 응답(response)을 파싱하는 기본 메서드.
이 메서드에서는 제품 리스트 페이지에서 각 제품의 URL을 추출하고,
'다음 페이지'가 있다면 재귀적으로 파싱을 계속 진행함.
"""
# 제품 리스트가 포함된 ul 요소 선택 (id가 'product-grid')
container = response.css('ul#product-grid')
# ul 내부의 각 제품 링크(a 태그, class: card__information--title) 선택
items_a_tags = container.css('a.card__information--title')
# 각 제품 링크에 대해 처리
for a_tag in items_a_tags:
# a 태그의 href 속성 값을 이용해 전체 제품 상세 페이지 URL 구성
product_url = 'https://flyingtiger.com' + a_tag.attrib['href']
# 제품 상세 페이지에 대한 Request를 생성하고, 응답은 parse_items 메서드로 처리
yield scrapy.Request(product_url, callback=self.parse_items)
# 페이지네이션: '다음' 버튼(a 태그, aria-label 속성이 "Next") 선택
next_page_link = response.css('a[aria-label="Next"]')
# 만약 '다음 페이지' 링크가 존재하면, 해당 URL로 이동하여 다시 parse 메서드로 파싱 시작
if next_page_link:
yield response.follow(next_page_link.attrib['href'], callback=self.parse)
def parse_items(self, response):
"""
제품 상세 페이지(response)를 파싱하는 메서드.
제품의 이름, 가격, 상품 코드, 이미지 URL 등 상세 정보를 추출하여 TigerItem에 저장하고,
추가적으로 이미지 URL과 상품 코드를 ImageItem에 저장하여 별도 파이프라인에서 이미지 처리를 진행.
"""
# TigerItem 인스턴스 생성 (제품 정보 저장용)
tiger_item = TigerItem()
# 제품 이름 추출: h1 태그에 class 'title'의 텍스트를 가져와 공백 제거
tiger_item['name'] = response.css('h1.title::text').get().strip()
# 제품 가격 추출: span 태그에 class 'price-item'의 텍스트를 가져와 공백 제거
tiger_item['price'] = response.css('span.price-item::text').get().strip()
# 제품 코드 추출: div 태그에 class 'product__sku'의 텍스트를 가져와 ':' 기준으로 분리 후 마지막 값을 사용하고 공백 제거
# 예시: "Product code: 3059034" 에서 "3059034"를 추출
tiger_item['product_code'] = response.css('div.product__sku::text').get().split(':')[-1].strip()
# 제품 이미지 URL 추출: div 태그에 class 'product__media' 내의 첫 번째 img 태그의 src 속성을 가져옴.
# '?' 이후의 파라미터들을 제거한 후, 'https:' 프로토콜을 붙여서 완전한 URL을 구성
tiger_item['image_url'] = ('https:' +
response.css('div.product__media img').attrib['src'].split('?')[0])
# 현재 제품 상세 페이지의 URL 저장
tiger_item['product_url'] = response.url
# 완성된 제품 정보를 yield하여 파이프라인으로 전달
yield tiger_item
# ImageItem 인스턴스 생성 (이미지 다운로드 처리를 위한 별도 아이템)
image_item = ImageItem()
# 위와 동일하게 이미지 URL을 추출하여 ImageItem에 저장
image_item['image_url'] = ('https:' +
response.css('div.product__media img').attrib['src'].split('?')[0])
# 제품 코드도 다시 추출하여 ImageItem에 저장 (나중에 이미지와 제품 정보를 매핑하기 위함)
image_item['product_code'] = response.css('div.product__sku::text').get().split(':')[-1].strip()
# 완성된 이미지 정보를 yield하여 파이프라인으로 전달
yield image_item
1.1. 코드의 주요 동작 요약
- 초기 설정:
- name, allowed_domains, start_urls를 통해 스파이더의 기본 정보를 설정
- custom_settings로 아이템 파이프라인과 이미지 저장 경로를 지정하여, 크롤링 후 JSON 파일 저장 및 이미지 다운로드를 처리
- parse 메서드:
- 제품 리스트 페이지(shop-all)에서 제품 링크들을 선택하여 각 상세 페이지로 Request를 보냄
- 페이지 하단의 '다음 페이지' 링크가 있다면, 재귀적으로 파싱을 이어감
- parse_items 메서드:
- 각 제품 상세 페이지에서 제품의 이름, 가격, 상품 코드, 이미지 URL 등을 추출
- 추출한 정보를 TigerItem에 저장 후 yield
- 이미지 URL과 상품 코드를 별도로 ImageItem에 저장하여 yield함으로써, 이미지 처리 파이프라인에서 사용
2. product.py
import scrapy
from ..items import TigerItem # items 모듈에서 TigerItem 클래스를 임포트
class ProductSpider(scrapy.Spider):
# 스파이더 이름 설정 (실행 시 이 이름으로 호출됨)
name = "product"
# 크롤링할 수 있는 도메인 제한 (이 도메인 이외의 사이트로는 접근하지 않음)
allowed_domains = ["flyingtiger.com"]
# 크롤링 시작 URL 목록 (제품 목록 페이지)
start_urls = ["https://flyingtiger.com/collections/shop-all"]
def parse(self, response):
"""
제품 목록 페이지의 응답(response)을 파싱하는 메서드.
이 메서드에서는 제품 목록에서 각 제품의 상세 페이지 URL을 추출하고,
'다음 페이지'가 있으면 페이지네이션 처리를 수행하여 재귀적으로 파싱을 진행한다.
"""
# 1. 제품 목록을 포함한 ul 태그 선택 (id가 'product-grid')
container = response.css('ul#product-grid')
# 2. 각 제품의 상세 정보 링크가 포함된 a 태그 선택 (클래스 이름: card__information--title)
items_a_tags = container.css('a.card__information--title')
# 3. 선택한 각 a 태그를 순회하며 제품 상세 페이지 URL을 구성하고 요청 생성
for a_tag in items_a_tags:
# a 태그의 href 속성 값에 base URL을 추가하여 전체 제품 상세 페이지 URL 구성
product_url = 'https://flyingtiger.com' + a_tag.attrib['href']
# 제품 상세 페이지 URL로 요청을 보내고, 응답은 parse_items 메서드로 처리
yield scrapy.Request(product_url, callback=self.parse_items)
# 4. 페이지네이션 처리: '다음' 페이지 링크(a 태그, aria-label="Next") 선택
next_page_link = response.css('a[aria-label="Next"]')
if next_page_link:
# '다음 페이지' 링크의 href 속성 값을 이용해 다음 페이지로 이동하는 요청 생성
yield response.follow(next_page_link.attrib['href'], callback=self.parse)
def parse_items(self, response):
"""
제품 상세 페이지의 응답(response)을 파싱하는 메서드.
이 메서드에서는 제품의 상세 정보를 추출하여 TigerItem 객체에 저장한다.
추출 정보에는 제품 이름, 가격, 제품 코드, 이미지 URL, 그리고 제품 상세 페이지 URL이 포함된다.
"""
# 1. TigerItem 인스턴스 생성 (추출한 데이터를 저장할 객체)
tiger_item = TigerItem()
# 2. 제품 이름 추출:
# h1 태그의 class 'title'에서 텍스트를 가져와 좌우 공백 제거
tiger_item['name'] = response.css('h1.title::text').get().strip()
# 3. 제품 가격 추출:
# span 태그의 class 'price-item'에서 텍스트를 가져와 좌우 공백 제거
tiger_item['price'] = response.css('span.price-item::text').get().strip()
# 4. 제품 코드 추출:
# div 태그의 class 'product__sku'에서 텍스트를 가져와 ':'를 기준으로 분리한 뒤 마지막 요소(실제 코드)를 선택하고, 공백 제거
# 예: "Product code: 3059034" → "3059034"
tiger_item['product_code'] = response.css('div.product__sku::text').get().split(':')[-1].strip()
# 5. 제품 이미지 URL 추출:
# div 태그의 class 'product__media' 내부의 img 태그에서 src 속성을 가져오고,
# URL에 포함된 추가 파라미터(예: ?v=1717051519&width=1200)를 제거하기 위해 split('?') 사용,
# 그리고 'https:' 프로토콜을 앞에 붙여 완전한 URL로 만듦.
tiger_item['image_url'] = ('https:' +
response.css('div.product__media img').attrib['src'].split('?')[0])
# 6. 제품 상세 페이지의 URL 저장 (현재 응답을 받은 URL)
tiger_item['product_url'] = response.url
# 7. 완성된 제품 정보를 yield하여 아이템 파이프라인으로 전달
yield tiger_item
2.1. 코드의 주요 동작 요약
- 초기 설정:
- name, allowed_domains, start_urls를 통해 스파이더의 기본 정보와 시작 지점을 설정한다.
- parse 메서드:
- 제품 목록 페이지에서 ul 태그(id: product-grid) 내부의 각 제품 링크(a 태그, class: card__information--title)를 선택한다.
- 각 링크에 대해 전체 URL을 구성하여 제품 상세 페이지 요청을 보낸다.
- 페이지 하단에 '다음 페이지' 링크(aria-label="Next")가 있으면, 따라가며 추가 제품 목록 페이지를 파싱한다.
- parse_items 메서드:
- 각 제품 상세 페이지에서 제품 이름, 가격, 제품 코드, 이미지 URL, 그리고 현재 페이지 URL을 추출한다.
- 추출된 데이터는 TigerItem 객체에 저장되어 아이템 파이프라인으로 전달된다.
3) pipelines
# 파이프라인들을 정의하는 모듈입니다.
# 반드시 ITEM_PIPELINES 설정에 해당 파이프라인들을 추가해야 스크래피가 실행 시 파이프라인 처리를 진행합니다.
import scrapy
from itemadapter import ItemAdapter
import json
from .items import TigerItem, ImageItem # 두 가지 아이템(TigerItem, ImageItem)을 임포트
from scrapy.pipelines.images import ImagesPipeline # 스크래피 기본 이미지 파이프라인을 상속받기 위함
class TigerPipeline:
# 기본 파이프라인: 들어온 아이템을 그대로 반환 (추후 추가 처리를 위한 템플릿)
def process_item(self, item, spider):
return item
class JsonWriterPipeline:
# 스파이더가 시작될 때 호출되는 메서드 (파일 열기)
def open_spider(self, spider):
# flying_tiger_data.json 파일을 쓰기 모드로 열고 UTF-8 인코딩 설정
self.file = open('flying_tiger_data.json', 'w', encoding='utf-8')
# JSON 배열의 시작 부분 기록
self.file.write('[\n')
# 각 아이템이 파이프라인을 통과할 때 호출되는 메서드
def process_item(self, item, spider):
# 아이템이 TigerItem 인스턴스인 경우에만 JSON으로 기록
if isinstance(item, TigerItem):
# dict(item)로 아이템을 딕셔너리 형태로 변환한 후, JSON 문자열로 변환
# ensure_ascii=False로 설정하여 한글 등 비 ASCII 문자도 올바르게 저장
line = json.dumps(dict(item), ensure_ascii=False) + ',\n'
# 변환된 JSON 문자열을 파일에 기록
self.file.write(line)
# 아이템을 반환하여 다음 파이프라인이나 최종 결과로 전달
return item
else:
# TigerItem이 아닌 경우 그냥 아이템 반환 (저장하지 않음)
return item
# 스파이더가 종료될 때 호출되는 메서드 (파일 닫기)
def close_spider(self, spider):
# JSON 배열의 종료 부분 기록
self.file.write('\n]')
# 파일 닫기
self.file.close()
class CustomImagesPipeline(ImagesPipeline):
# 스크래피의 ImagesPipeline을 상속받아 사용자 정의 이미지 파이프라인을 구현
# 각 아이템에 대해 다운로드할 이미지 요청을 생성하는 메서드
def get_media_requests(self, item, info):
# 아이템이 ImageItem 인스턴스인 경우에만 처리
if isinstance(item, ImageItem):
# 이미지 URL로 스크래피 Request를 생성, meta에 아이템 정보를 저장하여 이후 file_path 메서드에서 활용
yield scrapy.Request(item['image_url'], meta={'item': item})
# 다운로드된 이미지의 저장 경로를 설정하는 메서드
def file_path(self, request, response=None, info=None, *, item=None):
# request의 meta에서 아이템 정보를 가져와 제품 코드(product_code)를 파일 이름으로 사용
image_name = request.meta['item']['product_code']
# {product_code}.jpg 형식의 파일명 반환
return f'{image_name}.jpg'
3. 코드의 주요 동작 요약
3.1. TigerPipeline
- 역할: 단순히 들어온 아이템을 아무런 처리 없이 그대로 반환한다.
- 사용 이유: 기본 파이프라인의 틀을 제공하거나, 추후에 추가적인 처리를 위해 확장할 수 있다.
3.2. JsonWriterPipeline
- 역할: 크롤링한 TigerItem 데이터를 JSON 파일로 저장한다.
- 동작 순서:
- open_spider: 스파이더 실행 시 파일을 열고 JSON 배열의 시작([\n)을 기록한다.
- process_item: 각 아이템이 TigerItem 인스턴스인 경우, 이를 JSON 형식으로 변환하여 파일에 기록한다. 다른 타입의 아이템은 그대로 반환한다.
- close_spider: 스파이더 종료 시 JSON 배열의 끝(\n])을 기록하고 파일을 닫는다.
3.3. CustomImagesPipeline
- 역할: ImageItem에 저장된 이미지 URL을 사용하여 이미지를 다운로드합니다.
- 동작 순서:
- get_media_requests: 아이템이 ImageItem 인스턴스인 경우, 해당 이미지 URL로 스크래피 요청을 생성한다.
- 요청 시 meta에 아이템 정보를 저장해 후속 처리에 활용한다.
- file_path: 다운로드된 이미지를 저장할 파일 경로를 설정한다.
- 여기서는 아이템의 product_code를 파일명으로 사용하여 {product_code}.jpg 형식으로 저장한다.
4) settings.py
- 로그 설정
LOG_LEVEL = "ERROR"
LOG_FILE = "scrapy_error.log"
6. SCRAPY 4 - REAL LIFE EXAMPLE 4 (YELP)
1) 프로젝트 구조
Scrapy4_RealLifeProject4/
├── scrapy_real_life_4/
│ ├── my_spider/
│ │ ├── __init__.py
│ │ ├── items.py
│ │ ├── pipelines.py
│ │ ├── settings.py
│ │ └── spiders/
│ │ ├── __init__.py
│ │ └── example_spider.py
│ └── scrapy.cfg
└── ...
2) spider
1. yelpcrawler.py
- 코드 동작 개요
- 클래스 및 기본 설정
- YelpcrawlerSpider 클래스는 Scrapy의 CrawlSpider를 상속받아 Yelp에서 Gyms 관련 정보를 크롤링한다.
- allowed_domains를 통해 크롤링할 도메인을 "yelp.com"으로 제한한다.
- start_urls에 Yelp의 Berlin, Germany 지역 헬스장 검색 결과 페이지 URL을 지정한다.
- 크롤링 규칙 (Rules)
- 두 가지 규칙을 사용하여 링크를 추출한다.
- 첫 번째 규칙은 URL에 "desc=Gyms"와 "start="가 포함된 페이지들을 추출해 페이지네이션을 처리하며, 추가 링크들을 따라간다.
- 두 번째 규칙은 URL에 "biz/"와 "osq=Gyms" 패턴이 있는 비즈니스 상세 페이지를 대상으로 하며, URL에 'hrid'가 포함된 경우는 제외한다. 이 규칙은 해당 상세 페이지에 도달했을 때 parse_item 콜백 함수를 실행한다.
- 상세 페이지 데이터 추출 (parse_item 메서드)
- 페이지에서 헬스장 이름, 현재 페이지 URL, 비즈니스 웹사이트 링크, 전화번호, 주소 등의 정보를 추출한다.
- 웹사이트 링크 처리:
- "Business website"라는 텍스트가 포함된 <p> 태그의 바로 다음 <p> 태그 내 <a> 태그에서 링크 정보를 추출한다.
- 만약 링크 텍스트가 '…'로 끝난다면, href 속성의 값을 사용하여 실제 웹사이트 링크로 판단한다. 그렇지 않으면 텍스트 값을 그대로 사용한다.
- 전화번호, 주소:
- "Phone number"와 "Get Directions" 관련 태그에서 정보를 추출하며, 정보가 없으면 'No info'로 설정한다.
- 최종 출력:
- 추출된 데이터는 딕셔너리 형태로 yield되어 후속 파이프라인이나 저장 과정에서 사용된다.
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
# CrawlSpider를 상속받아 Yelp에서 헬스장 관련 정보를 크롤링하는 스파이더 클래스 정의
class YelpcrawlerSpider(CrawlSpider):
# 스파이더 이름 설정 (실행 시 이 이름을 사용)
name = "yelpcrawler"
# 크롤링 가능한 도메인 제한 (yelp.com 외의 도메인은 크롤링하지 않음)
allowed_domains = ["yelp.com"]
# 시작 URL 리스트: Yelp의 Berlin, Germany 지역 헬스장 검색 결과 페이지
start_urls = ["https://www.yelp.com/search?find_desc=Gyms&find_loc=Berlin%2C+Germany"]
# 크롤링 규칙 (Rules)을 튜플로 정의
rules = (
# 1. 첫 번째 규칙:
# URL에 "desc=Gyms"와 "start="가 포함된 페이지를 추출 (페이지네이션 처리)
# follow=True로 설정하여 추출된 링크들을 계속해서 따라감
Rule(LinkExtractor(allow=r"desc=Gyms.*start="), follow=True),
# 2. 두 번째 규칙:
# URL에 "biz/"와 "osq=Gyms" 패턴이 포함된 비즈니스 상세 페이지를 추출
# 'hrid'가 포함된 URL은 제외(deny='hrid')
# 해당 링크에 접근할 때 parse_item 콜백 함수를 호출하여 상세 정보를 추출
# follow=True로 설정하여 상세 페이지 내의 추가 링크도 추출할 수 있음
Rule(LinkExtractor(allow=r"biz/.*osq=Gyms", deny='hrid'), callback="parse_item", follow=True),
)
# 상세 페이지에서 정보를 추출하는 메서드
def parse_item(self, response):
# 1. 헬스장 이름 추출: h1 태그의 클래스 'y-css-olzveb'에서 텍스트 값을 가져옴
name = response.css('h1.y-css-olzveb::text').get()
# 2. 현재 상세 페이지의 URL 저장
url = response.url
# 3. 비즈니스 웹사이트 링크 추출:
# "Business website" 텍스트를 가진 p 태그의 바로 다음 p 태그 내 a 태그를 선택
a_tag = response.xpath('//p[text()="Business website"]/following-sibling::p[1]/a')
# 선택된 a 태그에서 텍스트 값을 추출 (표시된 웹사이트 주소)
url_on_website = a_tag.css('::text').get()
# 4. 웹사이트 링크 처리: 텍스트 값이 있을 경우와 없을 경우를 분기 처리
if url_on_website is not None:
# 만약 텍스트의 마지막 문자가 '…' (생략 기호)라면,
# a 태그의 href 속성 값을 사용하여 실제 웹사이트 링크를 가져옴
if url_on_website[-1] == '…':
website_link = a_tag.attrib['href']
else:
# 그렇지 않으면, 텍스트 자체를 웹사이트 링크로 사용
website_link = url_on_website
else:
# 웹사이트 정보가 없으면 'No info'로 설정
website_link = 'No info'
# 5. 전화번호 추출:
# "Phone number" 텍스트를 가진 p 태그의 바로 다음 p 태그에서 텍스트 값을 추출
phone = response.xpath('//p[text()="Phone number"]/following-sibling::p[1]/text()').get()
# 전화번호가 없을 경우 'No info'로 설정
if phone is None:
phone = 'No info'
# 6. 주소 추출:
# "Get Directions" 텍스트를 가진 a 태그의 부모 요소 바로 다음 p 태그에서 텍스트 값을 추출
address = response.xpath('//a[text()="Get Directions"]/../following-sibling::p[1]/text()').get()
# 주소 정보가 없을 경우 'No info'로 설정
if address is None:
address = 'No info'
# 7. 추출한 정보를 딕셔너리 형태로 yield하여 후속 파이프라인 또는 저장 과정으로 전달
yield {
'name': name, # 헬스장 이름
'url': url, # 상세 페이지 URL
'website_link': website_link, # 비즈니스 웹사이트 링크 (또는 'No info')
'phone': phone, # 전화번호 (또는 'No info')
'adress': address # 주소 (오타가 있으나 그대로 사용, 또는 'No info')
}
3) settings
ROBOTSTXT_OBEY = False
RETRY_ENABLE = True
RETRY_TIME = 20
RETRY_HTTP_CODES = [402]
1. ROBOTSTXT_OBEY = False
- 웹사이트의 robots.txt 파일에 명시된 크롤링 제한 규칙을 무시한다. 즉, 사이트가 크롤러 접근을 제한하고 있더라도 이를 따르지 않고 데이터를 수집하게 된다.
2. RETRY_ENABLE = True
- 요청 실패 시 재시도 기능을 활성화한다. 네트워크 오류나 서버 응답 문제 등으로 인해 요청이 실패할 경우 자동으로 재요청을 하게 된다.
3. RETRY_TIME = 20
- 재시도 관련 설정으로, 재시도 시도 간격이나 최대 재시도 횟수(또는 시간)로 해석될 수 있다. 일반적으로 20초의 간격을 두고 재요청을 시도하도록 설정한 것으로 볼 수 있다.
4. RETRY_HTTP_CODES = [402]
- HTTP 응답 코드 중 402(Payment Required)를 받았을 때 재시도를 수행하도록 지정한다. 일반적인 경우에는 다른 에러 코드(예: 500, 503 등)에 대해 재시도를 수행하지만, 이 설정은 402 코드에 대해 별도로 재요청을 시도한다.
7. SCRAPY - PLAYWRIGHT
1) pagespider.py
import scrapy
from ..items import QuoteItem # items.py에서 정의된 QuoteItem 클래스를 가져옴
from scrapy.selector import Selector # Scrapy의 Selector를 사용하여 HTML 문서를 파싱하기 위함
class PagespiderSpider(scrapy.Spider):
name = "pagespider" # 스파이더의 이름을 "pagespider"로 정의
# 시작 요청을 생성하는 메소드. Scrapy는 이 메소드에서 반환된 Request들을 처리함.
def start_requests(self):
# 'http://quotes.toscrape.com/js/' URL로 요청을 보내는데,
# meta 딕셔너리에 "playwright": True를 포함하여 Playwright를 통해 자바스크립트 렌더링을 하도록 함.
# "playwright_include_page": True 옵션은 Playwright 페이지 객체를 응답의 meta에 포함시킴.
yield scrapy.Request(
'http://quotes.toscrape.com/js/',
meta={
"playwright": True,
"playwright_include_page": True
}
)
# 비동기 parse 메소드: Playwright 페이지를 활용하여 동적으로 로드된 콘텐츠를 처리함.
async def parse(self, response):
# meta 딕셔너리에서 Playwright 페이지 객체를 가져옴.
playwright_page = response.meta['playwright_page']
# while 루프를 통해 여러 페이지를 순회하면서 데이터를 수집함.
while True:
# Playwright 페이지에서 현재 페이지의 HTML 콘텐츠를 가져옴.
content = await playwright_page.content()
# Scrapy의 Selector를 사용하여 HTML 콘텐츠를 파싱.
selector = Selector(text=content)
# CSS 선택자를 이용해 'div' 태그 중 class가 'quote'인 요소들을 모두 선택.
quotes = selector.css('div.quote')
# 각 인용구(quote) 요소에 대해 반복 처리.
for quote in quotes:
# QuoteItem 객체 생성 (items.py에 정의된 데이터 구조)
quote_item = QuoteItem()
# 인용구 텍스트 추출: span 태그의 class 'text' 내부의 텍스트.
quote_item['quote'] = quote.css('span.text::text').get()
# 인용구의 저자 추출: small 태그의 class 'author' 내부의 텍스트.
quote_item['author'] = quote.css('small.author::text').get()
# 인용구와 관련된 태그들을 문자열로 결합하여 저장.
tags_string = ''
tags = quote.css('a.tag::text').getall()
for tag in tags:
# 각 태그 뒤에 공백을 추가하여 하나의 문자열로 만듦.
tags_string += tag + ' '
# 공백 제거 후 tags 필드에 저장.
quote_item['tags'] = tags_string.strip()
# 수집된 아이템을 yield하여 Scrapy 파이프라인으로 전달.
yield quote_item
# 현재 페이지의 "다음" 버튼 링크를 찾기 위한 선택자.
# 주의: 여기서 response 객체는 최초 응답이므로, 동적으로 변경된 페이지의 링크는 playwright_page의 콘텐츠에서 가져와야 할 수도 있음.
next_button_link = response.css('li.next a::attr(href)').get()
# "다음" 버튼이 존재하면 페이지 이동 로직 실행
if next_button_link is not None:
# Playwright의 locator를 사용하여 "다음" 버튼을 찾음.
next_button = playwright_page.locator('li.next a')
# "다음" 버튼 클릭 (비동기 방식)
await next_button.click()
# 페이지가 로드될 때까지 잠시 대기 (여기서는 5000ms = 5초)
await playwright_page.wait_for_timeout(5000)
else:
# "다음" 버튼이 없으면 더 이상 페이지가 없으므로 while 루프를 종료
break
1. 코드 동작 설명
- 시작 요청 및 Playwright 설정
- start_requests 메소드에서는 Scrapy Request를 생성하는데, meta 딕셔너리에 "playwright": True 옵션을 추가하여 요청 시 Playwright가 자바스크립트를 렌더링하도록 설정한다.
- "playwright_include_page":
- True 옵션은 렌더링된 페이지 객체를 이후에 사용하기 위해 응답 메타에 포함시킨다.
- 비동기 파싱 처리
- parse 메소드는 비동기 함수로 작성되어 있으며, Playwright 페이지 객체를 사용해 동적으로 로드된 HTML 콘텐츠를 가져온다.
- await playwright_page.content()를 통해 현재 페이지의 전체 HTML을 받아오고, 이를 Scrapy의 Selector에 전달해 파싱한다.
- 데이터 추출
- 파싱된 HTML에서 div.quote 요소들을 선택하여, 각 인용구의 텍스트, 저자, 그리고 태그 정보를 추출한다.
- 각 태그는 문자열로 결합되어 quote_item['tags'] 필드에 저장된다.
- 생성된 QuoteItem 객체는 yield를 통해 Scrapy 파이프라인에 넘겨진다.
- 페이지 네비게이션 (다음 페이지로 이동)
- 현재 페이지에서 "다음" 버튼이 있는지 확인한다. 만약 버튼이 존재한다면, Playwright의 locator를 사용해 해당 버튼을 선택한 후 클릭한다.
- 클릭 후 wait_for_timeout을 통해 5초간 대기하여 새 페이지가 로드될 시간을 확보한다.
- "다음" 버튼이 없으면 while 루프를 종료하여 스파이더가 종료된다.
2) quotespider.py
from typing import Iterable
import scrapy
from scrapy import Request
from ..items import QuoteItem # items.py에 정의된 QuoteItem 클래스를 가져옴
class QuotespiderSpider(scrapy.Spider):
name = "quotespider" # 스파이더의 이름을 "quotespider"로 지정
def start_requests(self):
# 시작 URL인 'http://quotes.toscrape.com/js/'에 요청을 보냅니다.
# meta 딕셔너리에 {"playwright": True}를 포함시켜 Playwright를 사용해 자바스크립트가 렌더링된 페이지를 가져오도록 설정합니다.
yield scrapy.Request('http://quotes.toscrape.com/js/', meta={"playwright": True})
def parse(self, response):
# CSS 선택자를 사용하여 response 내에서 모든 인용구 요소(div 태그의 class가 "quote"인 요소)를 선택합니다.
quotes = response.css('div.quote')
# 각 인용구 요소를 순회하며 데이터를 추출합니다.
for quote in quotes:
# QuoteItem 인스턴스를 생성하여 데이터를 저장할 객체를 만듭니다.
quote_item = QuoteItem()
# 인용구 텍스트 추출: span 태그의 class "text" 내부의 텍스트를 가져옵니다.
quote_item['quote'] = quote.css('span.text::text').get()
# 인용구 저자 추출: small 태그의 class "author" 내부의 텍스트를 가져옵니다.
quote_item['author'] = quote.css('small.author::text').get()
# 인용구에 관련된 태그들을 추출하고 하나의 문자열로 결합합니다.
tags_string = ''
# a 태그 중 class "tag"인 요소들의 텍스트를 모두 가져옵니다.
tags = quote.css('a.tag::text').getall()
for tag in tags:
# 각 태그에 공백을 추가하면서 tags_string에 누적
tags_string += tag + ' '
# 누적된 문자열의 앞뒤 공백 제거 후 'tags' 필드에 저장합니다.
quote_item['tags'] = tags_string.strip()
# 추출한 아이템을 yield하여 Scrapy 파이프라인 또는 저장 로직으로 전달합니다.
yield quote_item
# "다음 페이지" 버튼의 링크를 찾습니다.
# li 태그의 class "next" 내부의 a 태그의 href 속성을 가져옵니다.
next_button_link = response.css('li.next a::attr(href)').get()
if next_button_link is not None:
# 다음 페이지가 존재하면, response.follow()를 사용하여 상대 URL을 기반으로 새로운 요청을 만듭니다.
# callback은 다시 현재 parse 함수를 지정하여, 새 페이지에서도 같은 방식으로 데이터를 추출하도록 합니다.
# meta에 {"playwright": True}를 포함시켜 새 페이지에서도 자바스크립트 렌더링을 수행합니다.
yield response.follow(next_button_link, callback=self.parse, meta={"playwright": True})
1. 코드 동작 상세 설명
- 시작 요청 및 Playwright 활용
- start_requests 메소드에서 지정한 URL 'http://quotes.toscrape.com/js/'로 요청을 보낸다.
- meta={"playwright": True} 옵션을 사용하면, 해당 페이지가 자바스크립트로 동적으로 생성된 콘텐츠를 포함하고 있을 때 Playwright가 이를 렌더링해준다.
- 데이터 추출 과정 (parse 메소드)
- parse 메소드에서는 응답 객체를 받아 CSS 선택자를 통해 인용구가 담긴 요소(div.quote)들을 선택한다.
- 각 인용구 요소에 대해:
- 텍스트:
- span.text 요소에서 인용구의 내용을 추한다.
- 저자:
- small.author 요소에서 저자 이름을 추출합니다.
- 태그:
- a.tag 요소들을 모두 선택하여, 각 태그 텍스트를 하나의 문자열로 연결한 후 저장한다.
- 각 인용구 데이터를 QuoteItem 객체에 저장한 후, yield로 반환하여 후속 처리(예: 파이프라인 저장, 로그 기록 등)를 진행한다.
- 페이지 네비게이션
- 페이지 하단에 위치한 "다음" 버튼을 나타내는 li.next a 요소의 href 속성을 추출한다.
- 만약 "다음" 버튼 링크가 존재하면 response.follow() 메소드를 사용해 해당 링크로 새 요청을 보낸다.
- 이때도 meta={"playwright": True} 옵션을 포함시켜 새 페이지에서도 자바스크립트 렌더링이 적용되도록 한다.
- 새 페이지의 응답은 다시 parse 메소드로 전달되어 동일한 데이터 추출 과정을 반복한다.
3) waitspider.py
import scrapy
from ..items import QuoteItem # items.py 파일에 정의된 QuoteItem 클래스를 불러옴
from scrapy_playwright.page import PageMethod # Playwright의 페이지 메서드를 사용하기 위한 클래스
class WaitspiderSpider(scrapy.Spider):
name = "waitspider" # 스파이더의 이름 지정
def start_requests(self):
# 시작 요청: 'http://quotes.toscrape.com/js-delayed/' URL로 요청을 보냄.
# meta 옵션에 "playwright": True를 추가하여 Playwright를 사용해 페이지를 렌더링합니다.
# 또한 "playwright_page_methods" 옵션에 PageMethod 객체 리스트를 전달하여
# 요청 시 Playwright 페이지에서 특정 동작(여기서는 요소가 로드될 때까지 대기)을 실행합니다.
yield scrapy.Request(
'http://quotes.toscrape.com/js-delayed/',
meta={
"playwright": True,
"playwright_page_methods": [
# 아래 주석 처리된 코드는 타임아웃(지정된 시간만큼 대기)을 설정하는 방법입니다.
# PageMethod('wait_for_timeout', 10000), # 10초 동안 대기
# 대신 페이지에 'div.quote' 요소가 나타날 때까지 기다립니다.
PageMethod('wait_for_selector', 'div.quote')
]
}
)
def parse(self, response):
# 응답(response) 객체에서 CSS 선택자로 'div.quote' 요소들을 모두 선택합니다.
quotes = response.css('div.quote')
# 각 인용구(quote) 요소에 대해 데이터를 추출합니다.
for quote in quotes:
# QuoteItem 인스턴스를 생성하여 추출한 데이터를 저장할 객체를 만듭니다.
quote_item = QuoteItem()
# 인용구 텍스트 추출: span 태그의 class "text" 내부의 텍스트를 가져옵니다.
quote_item['quote'] = quote.css('span.text::text').get()
# 저자 정보 추출: small 태그의 class "author" 내부의 텍스트를 가져옵니다.
quote_item['author'] = quote.css('small.author::text').get()
# 태그 정보 추출: a 태그의 class "tag" 요소들의 텍스트를 모두 가져와 하나의 문자열로 만듭니다.
tags_string = ''
tags = quote.css('a.tag::text').getall()
for tag in tags:
# 각 태그 뒤에 공백 추가 후 누적
tags_string += tag + ' '
# 누적된 문자열의 앞뒤 공백을 제거하여 'tags' 필드에 저장
quote_item['tags'] = tags_string.strip()
# 추출한 아이템을 yield하여 파이프라인이나 저장 로직으로 넘깁니다.
yield quote_item
# "다음" 버튼 링크를 찾습니다.
# li 태그의 class "next" 내부의 a 태그의 href 속성을 추출합니다.
next_button_link = response.css('li.next a::attr(href)').get()
if next_button_link is not None:
# 만약 "다음" 버튼이 존재하면, response.follow()를 사용해 다음 페이지로 이동하는 요청을 생성합니다.
# meta 옵션에 다시 "playwright": True와 playright_page_methods 리스트를 전달하여
# 새 페이지에서도 동일하게 자바스크립트 렌더링과 요소 대기를 수행하도록 설정합니다.
yield response.follow(
next_button_link,
callback=self.parse,
meta={
"playwright": True,
"playwright_page_methods": [
# 10초 대기 대신 'div.quote' 요소가 나타날 때까지 대기하는 방식
PageMethod('wait_for_selector', 'div.quote')
]
}
)
1. 코드 동작 상세 설명
- 시작 요청 및 Playwright 설정
- start_requests 메소드에서는 http://quotes.toscrape.com/js-delayed/ URL로 요청을 보낸다.
- meta 딕셔너리에 "playwright": True를 설정, 요청 시 Playwright를 통해 자바스크립트 렌더링을 하도록 지정한다.
- "playwright_page_methods" 옵션은 Playwright 페이지에서 실행할 메서드를 지정한다.
- 여기서는 PageMethod('wait_for_selector', 'div.quote')를 사용하여, 페이지에 인용구 요소(div.quote)가 나타날 때까지 대기하도록 설정한다.
- (주석 처리된 wait_for_timeout은 고정 시간만큼 대기하는 방법으로, 요소가 로드되었는지 확실하지 않은 경우에 사용할 수 있다.)
- 데이터 추출 (parse 메소드)
- parse 메소드는 응답(response) 객체에서 CSS 선택자를 사용해 모든 div.quote 요소를 선택한다.
- 각 인용구 요소에 대해:
- 텍스트 추출: span.text 내부의 텍스트를 가져와 quote_item['quote']에 저장한다.
- 저자 추출: small.author 내부의 텍스트를 가져와 quote_item['author']에 저장한다.
- 태그 추출: a.tag 요소들의 텍스트를 모두 가져와, 공백으로 구분된 하나의 문자열로 결합하여 quote_item['tags']에 저장한다.
- 추출된 각 QuoteItem 객체는 yield되어 이후 파이프라인이나 저장 단계로 전달된다.
- 페이지 네비게이션
- 현재 페이지 하단에 있는 "다음" 버튼(li.next a)의 링크를 추출한다.
- 만약 "다음" 버튼 링크가 존재하면, response.follow()를 사용하여 상대 URL을 기반으로 새로운 요청을 생성한다.
- 새 요청에서도 meta 딕셔너리에 "playwright": True와 playwright_page_methods 옵션을 전달하여, 새 페이지에서도 자바스크립트 렌더링과 대기 동작이 수행되도록 한다.
- 이렇게 함으로써 지연되어 로드되는 페이지에서도 데이터가 완전히 로드된 후에 파싱이 이루어진다.
4) settings
- 기본설정
DOWNLOAD_HANDLERS = {
"http": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
"https": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
}
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
- reference :
https://brunch.co.kr/@jamescompany/82
02화 Playwright 시작하기
첫 번째. | Playwright란? Playwright는 웹 브라우저 자동화 도구로, 현대적인 웹 애플리케이션 테스트를 손쉽게 수행할 수 있도록 설계되었습니다. Selenium과 유사한 기능을 제공하지만, 더 빠르고 안
brunch.co.kr
https://github.com/scrapy-plugins/scrapy-playwright
GitHub - scrapy-plugins/scrapy-playwright: 🎭 Playwright integration for Scrapy
🎭 Playwright integration for Scrapy. Contribute to scrapy-plugins/scrapy-playwright development by creating an account on GitHub.
github.com
'Crawling' 카테고리의 다른 글
SCRAPY 프레임워크의 사용 방법 정리 (2) (0) | 2025.02.28 |
---|---|
SCRAPY 프레임워크의 사용 방법 정리 (1) (0) | 2025.02.27 |
[udemy] Web Scraping with Python: BeautifulSoup, Requests & Selenium 학습 정리 (0) | 2025.02.23 |
BeautifulSoup4 기본 사용방법 정리 (0) | 2025.02.22 |
BeautifulSoup4 매뉴얼 정리 (0) | 2025.02.06 |
댓글