|
- #/usr/bin/python3
- # My First attempt at building a selenium crawler. It works but many parts of the code are unnecessary or
- # could have been written better
- from selenium import webdriver
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.webdriver.common.action_chains import ActionChains
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.common.by import By
- from selenium.common.exceptions import ElementClickInterceptedException, ElementNotInteractableException
- from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
- from PIL import Image
-
- import base64
- import selenium
- import os, traceback, sys
- import random
- import json
- import time
- import requests
-
- class Spider():
-
- def __init__(self):
- self.keys = webdriver.common.keys
- self.settings = self.getConfig('login.json')
-
- fp = self.getDriverPrefs()
- self.driver = webdriver.Firefox(firefox_profile=fp)
- self.driver.implicitly_wait(10)
- self.waitTime = random.randint(4, 10)
-
- self.start()
- return
-
- def getDriverPrefs(self):
- fp = webdriver.FirefoxProfile()
- fp.set_preference("browser.download.folderList", 2)
- fp.set_preference("browser.download.manager.showWhenStarting", False)
- fp.set_preference("browser.download.dir", os.getcwd()+'/Results/')
- fp.set_preference("browser.helperApps.neverAsk.saveToDisk", 'text/csv')
- #fp.set_preference("browser.download.manager.focusWhenStarting", False)
- #fp.set_preference("browser.download.useDownloadDir", True)
- #fp.set_preference("browser.helperApps.alwaysAsk.force", False)
- #fp.set_preference("browser.download.manager.alertOnEXEOpen", False)
- #fp.set_preference("browser.download.manager.closeWhenDone", True)
- #fp.set_preference("browser.download.manager.showAlertOnComplete", False)
- #fp.set_preference("browser.download.manager.useWindow", False)
- #fp.set_preference("services.sync.prefs.sync.browser.download.manager.showWhenStarting", False)
- #fp.set_preference("pdfjs.disabled", True)
- #fp.set_preference('profile.default_content_setting_values.automatic_downloads', 1)
- return fp
-
- def getConfig(self, fpath):
- with open(fpath, 'r', encoding='utf-8') as login_file:
- return json.loads(login_file.read())
-
-
- def start(self):
- self.driver.get(self.settings["entry"][0])
- assert "Authorization" in self.driver.title
-
- self.loginClicks() #Logs in, navigates through prompts and terms pages
-
- #Put exception handler here for updating json file. Should work for all exceptions including C-c
- try:
- self.cleanSelects()
- self.getData()
- self.gracefulQuit()
- except KeyboardInterrupt:
- print ("Shutdown requested...exiting")
- self.gracefulQuit()
- except TimeoutException:
- self.__init__()
- except Exception:
- traceback.print_exc(file=sys.stdout)
- return
-
- def loginClicks(self):
- e = self.driver.find_element_by_id("userid")
- e.send_keys(self.settings["username"])
- e = self.driver.find_element_by_id("pin")
- e.send_keys(self.settings["password"])
- self.handleCaptcha(self.driver.find_element_by_css_selector, 'input.btn-primary').click()
- self.driver.find_element_by_id("chkAgree").click()
- time.sleep(self.settings['waitTime'])
- self.driver.find_element_by_class_name("action-agree").click()
- req = self.driver.find_element_by_css_selector('.CaBusiness')
- self.realClick(self.driver.find_element_by_css_selector('.CaBusiness a'), req)
- return
-
- def handleCaptcha(self, x, *args):
- try:
- x = x(*args)
- return x
- except (selenium.common.exceptions.TimeoutException, selenium.common.exceptions.NoSuchElementException):
- self.solveCaptcha()
- return x(*args)
-
- def solveCaptcha(self):
- #pulls image
- print('handling captcha')
- time.sleep(self.settings['waitTime'])
- body = self.driver.find_element_by_css_selector('body')
- self.driver.switch_to.default_content()
- body.click()
- self.wait()
-
- e = self.driver.find_element_by_css_selector('#captcha-image')
- imgName = "captcha.png"
- image = self.pullCaptcha(e, imgName)
- key = self.postCaptcha(image)
- time.sleep(self.settings['waitTime'])
- answer = self.getCaptcha(key)
- if answer == None:
- return
- self.captchaClicks(answer)
- if len(self.driver.find_elements_by_css_selector('#captcha-image')) != 0:
- print ('captcha was wrong')
- self.solveCaptcha()
- return
-
- def pullCaptcha(self, element, path):
- # Gets captcha image
- location = element.location
- size = element.size
- # saves screenshot of entire page
- self.driver.save_screenshot(path)
-
- left = location['x']
- top = location['y'] + 140
- right = location['x'] + size['width']
- bottom = location['y'] + size['height'] + 140
-
- image = Image.open(path)
- image = image.crop((530, 405, 635, 440)) # defines crop points
- image.save(path, format='png') # saves new cropped image
-
- with open(path, "rb") as image_file:
- im = image_file.read()
- return im
-
- def postCaptcha(self, image):
- params = {'json': 1, 'key': self.settings["captchaKey"],
- 'method': 'post', 'calc': 1}
-
- files = {'file': ('file.png', image, 'image/png', {'Expires': '0'})}
- response = requests.post(self.settings["captchaPOST"], files=files,
- timeout=10, params=params)
- print(response.text)
- if 'ERROR' in response.text:
- raise Exception(f'Captcha Error: {response.text}')
-
- return response.json()['request']
-
-
- def getCaptcha(self, i):
- print(i)
- params = {'json': 1, 'key': self.settings["captchaKey"], 'action':'get', 'id':i}
- while True:
- response = requests.get(self.settings["captchaGET"], timeout=10, params=params)
- print(response.text)
- if 'UNSOLVABLE' in response.text or '+' in response.text or '-' in response.text:
- self.selectAndClick('#captcha-refresh')
- self.solveCaptcha()
- return
- if 'ERROR' in response.text:
- raise Exception(f'Captcha Error: {response.text}')
- if '=' in response.text:
- self.selectAndClick('#captcha-refresh')
- self.solveCaptcha()
- return
- if 'NOT_READY' in response.text:
- time.sleep(10)
- continue
- else:
- break
- a = response.json()['request']
- print(a)
- return a
-
- def selectAndClick(self, s):
- e = self.handleCaptcha(self.driver.find_element_by_css_selector, s)
- self.handleCaptcha(e.click)
- self.wait()
- return
-
- def captchaClicks(self, answer):
- #Last wrappers is needed in case the solution is wrong, other wrappers are extra
- inputBox = self.handleCaptcha(self.driver.find_element_by_css_selector, '#Attempt')
- self.handleCaptcha(inputBox.clear)
- self.handleCaptcha(inputBox.send_keys, answer)
- time.sleep(self.settings['waitTime'])
- btn = self.handleCaptcha(self.driver.find_element_by_css_selector, '.action-validate-captcha.originButtonCompact.ui-priority-primary')
- self.handleCaptcha(btn.click)
- return
-
- def realClick(self, target, req):
- #Used for when site needs special click event combinations
- a = webdriver.common.action_chains.ActionChains(self.driver)
- a.move_to_element(req).perform()
- chain = webdriver.common.action_chains.ActionChains(self.driver)
- chain.move_to_element(target).click(target).perform()
- return
-
- def getData(self):
- #Used for organizing sequence of bot commands according to configs
-
- if self.settings["completed"] >= self.settings["endPosition"]:
- print("Already at end position")
- self.gracefulQuit()
- return
-
- #Clear selects that may have been left by the last instance
- self.driver.get(self.settings['entry'][1])
- self.insertPosition(self.settings['completed'])
- self.selectRecords(False)
-
- p = 0
- while (self.settings["completed"] < self.settings["endPosition"]
- and p < self.settings["interval"]):
- self.clickSequence()
- self.saveCompleted()
- time.sleep(self.waitTime)
- p = p + 1
- return
-
- def insertPosition(self, p):
- try:
- inputBox = self.handleCaptcha(self.driver.find_element_by_css_selector, 'div.page.click-enterkey:not(.disabled)')
- self.wait()
- inputBox.click()
- self.wait()
- inputBox = self.handleCaptcha(self.driver.find_element_by_css_selector, 'input:not(#Attempt)')
- self.wait()
- #ActionChains(self.driver).move_to_element(inputBox).click().send_keys(startPosition).send_keys(Keys.RETURN).perform()
- self.handleCaptcha(inputBox.send_keys, p)
- self.handleCaptcha(inputBox.send_keys, Keys.RETURN)
- return
- except (StaleElementReferenceException, ElementNotInteractableException):
- self.wait()
- self.wait()
- self.insertPosition(p)
- return
- return
-
- def wait(self):
- time.sleep(self.settings['waitTime'])
- return
-
- def selectRecords(self, ifSelect):
- #This may create a bug where some records are missed because of captcha prompt
- for x in range(0,10):
- self.wait()
- #b = self.handleCaptcha(EC.visibility_of_element_located, (By.CSS_SELECTOR, '#checkboxCol.action-tag-all'))
- #w = self.handleCaptcha(WebDriverWait, self.driver, 10)
- #u = self.handleCaptcha(w.until, b)
- u = self.handleCaptcha(self.driver.find_element_by_css_selector, '#checkboxCol.action-tag-all')
- #allButton = self.handleCaptcha(WebDriverWait(self.driver, 10).until(
- #)
- allButton = u
- checked = allButton.get_attribute('checked')
- if (checked == None) == ifSelect:
- self.handleCaptcha(allButton.click)
- time.sleep(self.settings['waitTime'])
-
- nextButton = self.handleCaptcha(self.driver.find_element_by_css_selector, ".next.button")
- self.handleCaptcha(nextButton.click)
- return
-
- def clickSequence(self):
- #Decides which clicks should be done next to collect records
- url = self.settings["entry"][1]
- self.driver.get(url)
- startPosition = str(self.settings["completed"] + 1)
-
- self.handleCaptcha(self.insertPosition, startPosition)
- time.sleep(3)
- self.selectRecords(True)
- e = self.handleCaptcha(self.driver.find_element_by_css_selector, "a.download.action-download")
- e.click()
- self.wait()
- e = self.handleCaptcha(self.driver.find_element_by_css_selector, "#detailDetail")
- e.click()
- e = self.handleCaptcha(self.driver.find_element_by_css_selector,
- '.originButton.ui-priority-primary.view-on-exportlimit.view-on-randomsample.action-download')
- e.click()
- time.sleep(self.settings['waitTime'])
-
- #Records need to be deselected to select new ones
- self.driver.get(url)
- self.handleCaptcha(self.insertPosition, startPosition)
- self.selectRecords(False)
-
- self.settings["completed"] = self.settings["completed"] + 10
- print(f'completed: {self.settings["completed"]}')
- return
-
- def cleanSelects(self):
- if self.settings["cleanMode"][0]:
- url = self.settings["entry"][1]
- cleanMode = self.settings["cleanMode"]
- start = cleanMode[1]
- stop = cleanMode[2]
- else:
- return
-
- self.driver.get(url)
- self.handleCaptcha(self.insertPosition, str(start))
-
- #This will have side effects
- dif = int((start - stop)/10)
-
- for a in range(0, dif):
- self.selectRecords(False)
- return
-
- def saveCompleted(self):
- with open('login.json', 'w', encoding='utf-8') as login_file:
- login_file.write(json.dumps(self.settings, indent=4, sort_keys=True))
- return
- return
-
- def gracefulQuit(self):
- self.saveCompleted()
- return
-
-
- if __name__ == "__main__":
- spider = Spider()
|