|
|
@@ -0,0 +1,331 @@ |
|
|
|
#/usr/bin/python3 |
|
|
|
# My First attempt at building a selenium crawler. It works but many parts of the code are unnecessary or |
|
|
|
# could have been written better |
|
|
|
from selenium import webdriver |
|
|
|
from selenium.webdriver.support.ui import WebDriverWait |
|
|
|
from selenium.webdriver.common.action_chains import ActionChains |
|
|
|
from selenium.webdriver.common.keys import Keys |
|
|
|
from selenium.webdriver.support import expected_conditions as EC |
|
|
|
from selenium.webdriver.common.by import By |
|
|
|
from selenium.common.exceptions import ElementClickInterceptedException, ElementNotInteractableException |
|
|
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException |
|
|
|
from PIL import Image |
|
|
|
|
|
|
|
import base64 |
|
|
|
import selenium |
|
|
|
import os, traceback, sys |
|
|
|
import random |
|
|
|
import json |
|
|
|
import time |
|
|
|
import requests |
|
|
|
|
|
|
|
class Spider(): |
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
self.keys = webdriver.common.keys |
|
|
|
self.settings = self.getConfig('login.json') |
|
|
|
|
|
|
|
fp = self.getDriverPrefs() |
|
|
|
self.driver = webdriver.Firefox(firefox_profile=fp) |
|
|
|
self.driver.implicitly_wait(10) |
|
|
|
self.waitTime = random.randint(4, 10) |
|
|
|
|
|
|
|
self.start() |
|
|
|
return |
|
|
|
|
|
|
|
def getDriverPrefs(self): |
|
|
|
fp = webdriver.FirefoxProfile() |
|
|
|
fp.set_preference("browser.download.folderList", 2) |
|
|
|
fp.set_preference("browser.download.manager.showWhenStarting", False) |
|
|
|
fp.set_preference("browser.download.dir", os.getcwd()+'/Results/') |
|
|
|
fp.set_preference("browser.helperApps.neverAsk.saveToDisk", 'text/csv') |
|
|
|
#fp.set_preference("browser.download.manager.focusWhenStarting", False) |
|
|
|
#fp.set_preference("browser.download.useDownloadDir", True) |
|
|
|
#fp.set_preference("browser.helperApps.alwaysAsk.force", False) |
|
|
|
#fp.set_preference("browser.download.manager.alertOnEXEOpen", False) |
|
|
|
#fp.set_preference("browser.download.manager.closeWhenDone", True) |
|
|
|
#fp.set_preference("browser.download.manager.showAlertOnComplete", False) |
|
|
|
#fp.set_preference("browser.download.manager.useWindow", False) |
|
|
|
#fp.set_preference("services.sync.prefs.sync.browser.download.manager.showWhenStarting", False) |
|
|
|
#fp.set_preference("pdfjs.disabled", True) |
|
|
|
#fp.set_preference('profile.default_content_setting_values.automatic_downloads', 1) |
|
|
|
return fp |
|
|
|
|
|
|
|
def getConfig(self, fpath): |
|
|
|
with open(fpath, 'r', encoding='utf-8') as login_file: |
|
|
|
return json.loads(login_file.read()) |
|
|
|
|
|
|
|
|
|
|
|
def start(self): |
|
|
|
self.driver.get(self.settings["entry"][0]) |
|
|
|
assert "Authorization" in self.driver.title |
|
|
|
|
|
|
|
self.loginClicks() #Logs in, navigates through prompts and terms pages |
|
|
|
|
|
|
|
#Put exception handler here for updating json file. Should work for all exceptions including C-c |
|
|
|
try: |
|
|
|
self.cleanSelects() |
|
|
|
self.getData() |
|
|
|
self.gracefulQuit() |
|
|
|
except KeyboardInterrupt: |
|
|
|
print ("Shutdown requested...exiting") |
|
|
|
self.gracefulQuit() |
|
|
|
except TimeoutException: |
|
|
|
self.__init__() |
|
|
|
except Exception: |
|
|
|
traceback.print_exc(file=sys.stdout) |
|
|
|
return |
|
|
|
|
|
|
|
def loginClicks(self): |
|
|
|
e = self.driver.find_element_by_id("userid") |
|
|
|
e.send_keys(self.settings["username"]) |
|
|
|
e = self.driver.find_element_by_id("pin") |
|
|
|
e.send_keys(self.settings["password"]) |
|
|
|
self.handleCaptcha(self.driver.find_element_by_css_selector, 'input.btn-primary').click() |
|
|
|
self.driver.find_element_by_id("chkAgree").click() |
|
|
|
time.sleep(self.settings['waitTime']) |
|
|
|
self.driver.find_element_by_class_name("action-agree").click() |
|
|
|
req = self.driver.find_element_by_css_selector('.CaBusiness') |
|
|
|
self.realClick(self.driver.find_element_by_css_selector('.CaBusiness a'), req) |
|
|
|
return |
|
|
|
|
|
|
|
def handleCaptcha(self, x, *args): |
|
|
|
try: |
|
|
|
x = x(*args) |
|
|
|
return x |
|
|
|
except (selenium.common.exceptions.TimeoutException, selenium.common.exceptions.NoSuchElementException): |
|
|
|
self.solveCaptcha() |
|
|
|
return x(*args) |
|
|
|
|
|
|
|
def solveCaptcha(self): |
|
|
|
#pulls image |
|
|
|
print('handling captcha') |
|
|
|
time.sleep(self.settings['waitTime']) |
|
|
|
body = self.driver.find_element_by_css_selector('body') |
|
|
|
self.driver.switch_to.default_content() |
|
|
|
body.click() |
|
|
|
self.wait() |
|
|
|
|
|
|
|
e = self.driver.find_element_by_css_selector('#captcha-image') |
|
|
|
imgName = "captcha.png" |
|
|
|
image = self.pullCaptcha(e, imgName) |
|
|
|
key = self.postCaptcha(image) |
|
|
|
time.sleep(self.settings['waitTime']) |
|
|
|
answer = self.getCaptcha(key) |
|
|
|
if answer == None: |
|
|
|
return |
|
|
|
self.captchaClicks(answer) |
|
|
|
if len(self.driver.find_elements_by_css_selector('#captcha-image')) != 0: |
|
|
|
print ('captcha was wrong') |
|
|
|
self.solveCaptcha() |
|
|
|
return |
|
|
|
|
|
|
|
def pullCaptcha(self, element, path): |
|
|
|
# Gets captcha image |
|
|
|
location = element.location |
|
|
|
size = element.size |
|
|
|
# saves screenshot of entire page |
|
|
|
self.driver.save_screenshot(path) |
|
|
|
|
|
|
|
left = location['x'] |
|
|
|
top = location['y'] + 140 |
|
|
|
right = location['x'] + size['width'] |
|
|
|
bottom = location['y'] + size['height'] + 140 |
|
|
|
|
|
|
|
image = Image.open(path) |
|
|
|
image = image.crop((530, 405, 635, 440)) # defines crop points |
|
|
|
image.save(path, format='png') # saves new cropped image |
|
|
|
|
|
|
|
with open(path, "rb") as image_file: |
|
|
|
im = image_file.read() |
|
|
|
return im |
|
|
|
|
|
|
|
def postCaptcha(self, image): |
|
|
|
params = {'json': 1, 'key': self.settings["captchaKey"], |
|
|
|
'method': 'post', 'calc': 1} |
|
|
|
|
|
|
|
files = {'file': ('file.png', image, 'image/png', {'Expires': '0'})} |
|
|
|
response = requests.post(self.settings["captchaPOST"], files=files, |
|
|
|
timeout=10, params=params) |
|
|
|
print(response.text) |
|
|
|
if 'ERROR' in response.text: |
|
|
|
raise Exception(f'Captcha Error: {response.text}') |
|
|
|
|
|
|
|
return response.json()['request'] |
|
|
|
|
|
|
|
|
|
|
|
def getCaptcha(self, i): |
|
|
|
print(i) |
|
|
|
params = {'json': 1, 'key': self.settings["captchaKey"], 'action':'get', 'id':i} |
|
|
|
while True: |
|
|
|
response = requests.get(self.settings["captchaGET"], timeout=10, params=params) |
|
|
|
print(response.text) |
|
|
|
if 'UNSOLVABLE' in response.text or '+' in response.text or '-' in response.text: |
|
|
|
self.selectAndClick('#captcha-refresh') |
|
|
|
self.solveCaptcha() |
|
|
|
return |
|
|
|
if 'ERROR' in response.text: |
|
|
|
raise Exception(f'Captcha Error: {response.text}') |
|
|
|
if '=' in response.text: |
|
|
|
self.selectAndClick('#captcha-refresh') |
|
|
|
self.solveCaptcha() |
|
|
|
return |
|
|
|
if 'NOT_READY' in response.text: |
|
|
|
time.sleep(10) |
|
|
|
continue |
|
|
|
else: |
|
|
|
break |
|
|
|
a = response.json()['request'] |
|
|
|
print(a) |
|
|
|
return a |
|
|
|
|
|
|
|
def selectAndClick(self, s): |
|
|
|
e = self.handleCaptcha(self.driver.find_element_by_css_selector, s) |
|
|
|
self.handleCaptcha(e.click) |
|
|
|
self.wait() |
|
|
|
return |
|
|
|
|
|
|
|
def captchaClicks(self, answer): |
|
|
|
#Last wrappers is needed in case the solution is wrong, other wrappers are extra |
|
|
|
inputBox = self.handleCaptcha(self.driver.find_element_by_css_selector, '#Attempt') |
|
|
|
self.handleCaptcha(inputBox.clear) |
|
|
|
self.handleCaptcha(inputBox.send_keys, answer) |
|
|
|
time.sleep(self.settings['waitTime']) |
|
|
|
btn = self.handleCaptcha(self.driver.find_element_by_css_selector, '.action-validate-captcha.originButtonCompact.ui-priority-primary') |
|
|
|
self.handleCaptcha(btn.click) |
|
|
|
return |
|
|
|
|
|
|
|
def realClick(self, target, req): |
|
|
|
#Used for when site needs special click event combinations |
|
|
|
a = webdriver.common.action_chains.ActionChains(self.driver) |
|
|
|
a.move_to_element(req).perform() |
|
|
|
chain = webdriver.common.action_chains.ActionChains(self.driver) |
|
|
|
chain.move_to_element(target).click(target).perform() |
|
|
|
return |
|
|
|
|
|
|
|
def getData(self): |
|
|
|
#Used for organizing sequence of bot commands according to configs |
|
|
|
|
|
|
|
if self.settings["completed"] >= self.settings["endPosition"]: |
|
|
|
print("Already at end position") |
|
|
|
self.gracefulQuit() |
|
|
|
return |
|
|
|
|
|
|
|
#Clear selects that may have been left by the last instance |
|
|
|
self.driver.get(self.settings['entry'][1]) |
|
|
|
self.insertPosition(self.settings['completed']) |
|
|
|
self.selectRecords(False) |
|
|
|
|
|
|
|
p = 0 |
|
|
|
while (self.settings["completed"] < self.settings["endPosition"] |
|
|
|
and p < self.settings["interval"]): |
|
|
|
self.clickSequence() |
|
|
|
self.saveCompleted() |
|
|
|
time.sleep(self.waitTime) |
|
|
|
p = p + 1 |
|
|
|
return |
|
|
|
|
|
|
|
def insertPosition(self, p): |
|
|
|
try: |
|
|
|
inputBox = self.handleCaptcha(self.driver.find_element_by_css_selector, 'div.page.click-enterkey:not(.disabled)') |
|
|
|
self.wait() |
|
|
|
inputBox.click() |
|
|
|
self.wait() |
|
|
|
inputBox = self.handleCaptcha(self.driver.find_element_by_css_selector, 'input:not(#Attempt)') |
|
|
|
self.wait() |
|
|
|
#ActionChains(self.driver).move_to_element(inputBox).click().send_keys(startPosition).send_keys(Keys.RETURN).perform() |
|
|
|
self.handleCaptcha(inputBox.send_keys, p) |
|
|
|
self.handleCaptcha(inputBox.send_keys, Keys.RETURN) |
|
|
|
return |
|
|
|
except (StaleElementReferenceException, ElementNotInteractableException): |
|
|
|
self.wait() |
|
|
|
self.wait() |
|
|
|
self.insertPosition(p) |
|
|
|
return |
|
|
|
return |
|
|
|
|
|
|
|
def wait(self): |
|
|
|
time.sleep(self.settings['waitTime']) |
|
|
|
return |
|
|
|
|
|
|
|
def selectRecords(self, ifSelect): |
|
|
|
#This may create a bug where some records are missed because of captcha prompt |
|
|
|
for x in range(0,10): |
|
|
|
self.wait() |
|
|
|
#b = self.handleCaptcha(EC.visibility_of_element_located, (By.CSS_SELECTOR, '#checkboxCol.action-tag-all')) |
|
|
|
#w = self.handleCaptcha(WebDriverWait, self.driver, 10) |
|
|
|
#u = self.handleCaptcha(w.until, b) |
|
|
|
u = self.handleCaptcha(self.driver.find_element_by_css_selector, '#checkboxCol.action-tag-all') |
|
|
|
#allButton = self.handleCaptcha(WebDriverWait(self.driver, 10).until( |
|
|
|
#) |
|
|
|
allButton = u |
|
|
|
checked = allButton.get_attribute('checked') |
|
|
|
if (checked == None) == ifSelect: |
|
|
|
self.handleCaptcha(allButton.click) |
|
|
|
time.sleep(self.settings['waitTime']) |
|
|
|
|
|
|
|
nextButton = self.handleCaptcha(self.driver.find_element_by_css_selector, ".next.button") |
|
|
|
self.handleCaptcha(nextButton.click) |
|
|
|
return |
|
|
|
|
|
|
|
def clickSequence(self): |
|
|
|
#Decides which clicks should be done next to collect records |
|
|
|
url = self.settings["entry"][1] |
|
|
|
self.driver.get(url) |
|
|
|
startPosition = str(self.settings["completed"] + 1) |
|
|
|
|
|
|
|
self.handleCaptcha(self.insertPosition, startPosition) |
|
|
|
time.sleep(3) |
|
|
|
self.selectRecords(True) |
|
|
|
e = self.handleCaptcha(self.driver.find_element_by_css_selector, "a.download.action-download") |
|
|
|
e.click() |
|
|
|
self.wait() |
|
|
|
e = self.handleCaptcha(self.driver.find_element_by_css_selector, "#detailDetail") |
|
|
|
e.click() |
|
|
|
e = self.handleCaptcha(self.driver.find_element_by_css_selector, |
|
|
|
'.originButton.ui-priority-primary.view-on-exportlimit.view-on-randomsample.action-download') |
|
|
|
e.click() |
|
|
|
time.sleep(self.settings['waitTime']) |
|
|
|
|
|
|
|
#Records need to be deselected to select new ones |
|
|
|
self.driver.get(url) |
|
|
|
self.handleCaptcha(self.insertPosition, startPosition) |
|
|
|
self.selectRecords(False) |
|
|
|
|
|
|
|
self.settings["completed"] = self.settings["completed"] + 10 |
|
|
|
print(f'completed: {self.settings["completed"]}') |
|
|
|
return |
|
|
|
|
|
|
|
def cleanSelects(self): |
|
|
|
if self.settings["cleanMode"][0]: |
|
|
|
url = self.settings["entry"][1] |
|
|
|
cleanMode = self.settings["cleanMode"] |
|
|
|
start = cleanMode[1] |
|
|
|
stop = cleanMode[2] |
|
|
|
else: |
|
|
|
return |
|
|
|
|
|
|
|
self.driver.get(url) |
|
|
|
self.handleCaptcha(self.insertPosition, str(start)) |
|
|
|
|
|
|
|
#This will have side effects |
|
|
|
dif = int((start - stop)/10) |
|
|
|
|
|
|
|
for a in range(0, dif): |
|
|
|
self.selectRecords(False) |
|
|
|
return |
|
|
|
|
|
|
|
def saveCompleted(self): |
|
|
|
with open('login.json', 'w', encoding='utf-8') as login_file: |
|
|
|
login_file.write(json.dumps(self.settings, indent=4, sort_keys=True)) |
|
|
|
return |
|
|
|
return |
|
|
|
|
|
|
|
def gracefulQuit(self): |
|
|
|
self.saveCompleted() |
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
spider = Spider() |
|
|
|
|