본문 바로가기
주식 데이터 분석

네이버 주식 관련 블로그 크롤링 (파이썬)

by HanYaung 2024. 6. 16.

목표

  • 네이버 블로그에서 일정 기간동안 “종목명 + 관련주” 검색어로 나오는 모든 블로그 글들을 크롤링 하고자 함

특이사항

  • selenium이 아닌 playwright를 사용하여 동적 크롤링 진행
    • 이유 : 패키지 설치 편의성과 크롤링 코드 작성 편의성
  • async와 세마포어를 이용한 크롤링
    • 멀티스레드, 멀티프로세스를 사용하지 않은 이유 : i/o 작업에서 자원을 많이 사용하지 않기 위해
    • 세마포어를 사용한 이유 : 네이버에서 일정 속도로 크롤링을 하면 IP를 차단 당함 이를 방지하기 위해 브라우저 수와 속도를 제한 함
      • 컴퓨터 성능, 네트워크에 따라
      • semaphore = asyncio.Semaphore(8), await page.goto(url, timeout=10000)를 최적화하여 더 빠른 크롤링이 가능
  • 블로그 글 무한 스크롤 처리

코드

from playwright.sync_api import sync_playwright
import pandas as pd
from datetime import datetime
from playwright.async_api import async_playwright
import asyncio
from datetime import timedelta, datetime
import time
import tqdm
from tqdm.asyncio import tqdm

class BlogCrawler:
    async def crawler(self, semaphore, context,  name, date):
        async with semaphore:
            try:
                page = await context.new_page()
                url = f'<https://search.naver.com/search.naver?ssc=tab.blog.all&query={name}> 관련주&sm=tab_opt&nso=so%3Ar%2Cp%3Afrom{date}to{date}'
                await page.goto(url, timeout=10000) # 너무 빠른 페이지 이동시 검색 제한 됨, 스크롤로 블로그 글들을 가져오는건 막히지 않는거 같음
                while True:
                    element = await page.query_selector('h3.title')
                    if element:
                        text = await element.text_content()
                        if "검색 서비스 이용이 제한되었습니다." in text:
                            # print("제한 메시지가 확인되었습니다."+name+date)
                            await page.goto(url, timeout=2000) # 검색 제한시 다시 검색
                    else:
                        break

                last_height = await page.evaluate('document.body.scrollHeight')
                last_time = time.time()

                try:
                    while True:
                        # 스크롤 다운
                        await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
                        # 현재 시간 가져오기
                        current_time = time.time()

                        # 마지막 높이 체크 이후 2초가 지났는지 확인
                        if current_time - last_time > 2:
                            # 새로운 스크롤 높이를 계산
                            new_height = await page.evaluate('document.body.scrollHeight')
                            if new_height == last_height:
                                # 높이 변화가 없으면 종료
                                break
                            else:
                                # 높이 변화가 있으면 업데이트
                                last_height = new_height
                                last_time = time.time()  # 마지막 체크 시간 업데이트
                        # time.sleep(0.1)  # 너무 빠른 스크롤 방지를 위해 짧은 대기 시간 추가
                except Exception as e:
                    print(e)

                title_elements = await page.query_selector_all('div.title_area > a')
                title_list = [await title.text_content() for title in title_elements]
                href_list = [await title.get_attribute('href') for title in title_elements]

                header_elements = await page.query_selector_all('div.dsc_area > a.dsc_link')
                header_list = [await header.text_content() for header in header_elements]

                if len(title_list) != len(header_list):
                    header_list = [None] * len(title_list)  # 헤더가 없는 경우 None으로 채움

                df = pd.DataFrame({
                    'name': [name] * len(title_list),
                    'title': title_list,
                    'header': header_list,
                    'href': href_list,
                    'date': [date] * len(title_list)
                })
                return df
            except Exception as e:
                # print(e)
                # print(name,date,"error")
                return

            finally:
                try:
                    await context.clear_cookies()
                    await page.evaluate("localStorage.clear()")
                    await page.evaluate("sessionStorage.clear()")
                    await page.close()
                except Exception as e:
                    print(f"자원 정리 중 오류 발생: {e}")  # 자원 정리 중 발생한 예외를 캐치하여 로깅
    async def blog_crawler(self, name_list, start_date, end_date):
        if isinstance(start_date, datetime):
            start_date = start_date.date()
        elif isinstance(start_date, str):
            start_date = datetime.strptime(start_date, '%Y%m%d').date()

        if isinstance(end_date, datetime):
            end_date = end_date.date()
        elif isinstance(end_date, str):
            end_date = datetime.strptime(end_date, '%Y%m%d').date()
        date_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
        print(date_range)

        if not date_range:
            return False

        async with async_playwright() as p:
            # chrome gpu 가속에 메모리 사용량이 너무 커서 끔
            browser = await p.chromium.launch(headless=False, args=['--disable-gpu'])
            context = await browser.new_context(
                user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36')
            df_list = []

            semaphore = asyncio.Semaphore(8) #4
            futures =[]
            for name in name_list:
                for date in date_range:
                    futures.append(self.crawler(semaphore, context, name, date.strftime('%Y%m%d')))

            for future in tqdm(asyncio.as_completed(futures), total=len(futures), disable=True):
                tmp_df = await future
                if tmp_df is not None:
                    df_list.append(tmp_df)

            df = pd.concat(df_list, ignore_index=True)
            await browser.close()
            return df

 asyncio.run(crawler_blog.blog_crawler(company_name, start_date, end_date))