forked from apache/incubator-stormcrawler
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Enhancement: Selenium Grid Capability
Signed-off-by: Maimur Hasan <msghasan@gmail.com>
- Loading branch information
Showing
5 changed files
with
326 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
118 changes: 118 additions & 0 deletions
118
core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/** | ||
* Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE | ||
* file distributed with this work for additional information regarding copyright ownership. | ||
* DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* <p>http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* <p>Unless required by applicable law or agreed to in writing, software distributed under the | ||
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.digitalpebble.stormcrawler.protocol.selenium; | ||
|
||
import com.digitalpebble.stormcrawler.util.ConfUtils; | ||
import java.net.URL; | ||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.Timer; | ||
import java.util.TimerTask; | ||
import org.apache.storm.Config; | ||
import org.openqa.selenium.Dimension; | ||
import org.openqa.selenium.WebDriver; | ||
import org.openqa.selenium.remote.DesiredCapabilities; | ||
import org.openqa.selenium.remote.RemoteWebDriver; | ||
|
||
public class SeleniumGridImpl extends SeleniumGridProtocol { | ||
|
||
private TimerTask timerTask; | ||
private Timer updateQueue; | ||
private int noOfWorkers = 0; | ||
private String gridAddress; | ||
private final DesiredCapabilities capabilities = new DesiredCapabilities(); | ||
|
||
@Override | ||
public void configure(Config conf) { | ||
super.configure(conf); | ||
noOfWorkers = ConfUtils.getInt(conf, "topology.workers", 2); | ||
gridAddress = super.gridAddress; | ||
capabilities.setBrowserName( | ||
ConfUtils.getString(conf, "selenium.capabilities.browserName", "chrome")); | ||
capabilities.setCapability("newSessionWaitTimeout", 600); | ||
capabilities.setCapability("browserTimeout", 600); | ||
updateQueue = new Timer(); | ||
updateQueueOfBrowsers(); | ||
} | ||
|
||
protected void updateQueueOfBrowsers() { | ||
timerTask = | ||
new TimerTask() { | ||
@Override | ||
public void run() { | ||
try { | ||
List list = getAllNodesList(); | ||
LOG.info("Blocking Queue size: " + driversQueue.size()); | ||
while (getSessionsCount(list) > 0) { | ||
int size = list.size(); | ||
// Check if the queue size is more than the actual | ||
// no of browsers allowed per worker | ||
// means crawler services are idle because all drivers are in queue | ||
if (driversQueue.size() >= (size / noOfWorkers)) { | ||
SeleniumGridProtocol.Holder holder = driversQueue.take(); | ||
long totalTime = System.currentTimeMillis() - holder.getTime(); | ||
// clear the queue if the total time spent in the queue is more | ||
// than 4.5 minutes | ||
// As after 5 mintues the driver would be idle and will throw | ||
// exception | ||
// while we try to use that driver for fetching | ||
if (totalTime > 1000 * 4.5 * 60) { | ||
driversQueue.clear(); | ||
} | ||
} | ||
// so that browsers get equally divided among workers | ||
if (driversQueue.size() <= size / noOfWorkers) { | ||
RemoteWebDriver driver = getDriverFromNode(); | ||
if (driver != null) { | ||
driversQueue.put( | ||
new SeleniumGridProtocol.Holder( | ||
driver, System.currentTimeMillis())); | ||
LOG.info( | ||
"Placed driver in blocking queue: " | ||
+ driversQueue.size()); | ||
} | ||
} | ||
list = getAllNodesList(); | ||
} | ||
} catch (Exception e) { | ||
LOG.error( | ||
"Exception while running task for adding driver to the queue", | ||
e); | ||
} | ||
} | ||
}; | ||
// update the queue every 5 minutes | ||
updateQueue.schedule(timerTask, 0 * 60 * 1000, 5 * 60 * 1000); | ||
} | ||
|
||
protected RemoteWebDriver getDriverFromNode() { | ||
int sessionCount = 0; | ||
RemoteWebDriver driver = null; | ||
|
||
try { | ||
LOG.debug("Adding new driver from " + gridAddress); | ||
driver = new RemoteWebDriver(new URL(gridAddress), capabilities); | ||
WebDriver.Timeouts touts = driver.manage().timeouts(); | ||
WebDriver.Window window = driver.manage().window(); | ||
touts.implicitlyWait(Duration.ofSeconds(6)); | ||
touts.pageLoadTimeout(Duration.ofSeconds(60)); | ||
touts.scriptTimeout(Duration.ofSeconds(30)); | ||
window.setSize(new Dimension(1980, 1280)); | ||
LOG.debug("Inside getDriverFromGrid to set web drivers" + driver.hashCode()); | ||
} catch (Exception e) { | ||
} | ||
return driver; | ||
} | ||
} |
194 changes: 194 additions & 0 deletions
194
.../src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridProtocol.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
/** | ||
* Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE | ||
* file distributed with this work for additional information regarding copyright ownership. | ||
* DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* <p>http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* <p>Unless required by applicable law or agreed to in writing, software distributed under the | ||
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.digitalpebble.stormcrawler.protocol.selenium; | ||
|
||
import com.digitalpebble.stormcrawler.Metadata; | ||
import com.digitalpebble.stormcrawler.protocol.AbstractHttpProtocol; | ||
import com.digitalpebble.stormcrawler.protocol.HttpHeaders; | ||
import com.digitalpebble.stormcrawler.protocol.ProtocolResponse; | ||
import com.digitalpebble.stormcrawler.util.ConfUtils; | ||
import com.google.gson.Gson; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.io.Reader; | ||
import java.net.URL; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import org.apache.storm.Config; | ||
import org.openqa.selenium.remote.DesiredCapabilities; | ||
import org.openqa.selenium.remote.RemoteWebDriver; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public abstract class SeleniumGridProtocol extends AbstractHttpProtocol { | ||
protected static final org.slf4j.Logger LOG = | ||
LoggerFactory.getLogger(SeleniumGridProtocol.class); | ||
protected static LinkedBlockingQueue<Holder> driversQueue; | ||
private NavigationFilters filters; | ||
private final DesiredCapabilities capabilities = new DesiredCapabilities(); | ||
|
||
protected String gridAddress; | ||
|
||
@Override | ||
public void configure(Config conf) { | ||
super.configure(conf); | ||
driversQueue = new LinkedBlockingQueue<>(); | ||
filters = NavigationFilters.fromConf(conf); | ||
gridAddress = | ||
ConfUtils.getString(conf, "selenium.grid.address", "http://localhost:4444/wd/hub"); | ||
} | ||
|
||
protected synchronized List<Map<String, Object>> getAllNodesList() throws IOException { | ||
Map<String, Object> valueMap = null; | ||
boolean ready = false; | ||
while (!ready) { | ||
Map<String, Object> map = getStatusStream(); | ||
valueMap = (Map<String, Object>) map.get("value"); | ||
ready = (boolean) valueMap.get("ready"); | ||
if (!ready) { | ||
LOG.warn("Selenium Grid is not ready yet"); | ||
} | ||
} | ||
LOG.info("Grid Is Ready to Serve"); | ||
return (List<Map<String, Object>>) valueMap.get("nodes"); | ||
} | ||
|
||
private Map<String, Object> getStatusStream() throws IOException { | ||
Gson gson = new Gson(); | ||
URL url = new URL(gridAddress + "/status"); | ||
InputStream stream = url.openStream(); | ||
Reader reader = new InputStreamReader(stream); | ||
Map<String, Object> map = gson.fromJson(reader, Map.class); | ||
stream.close(); | ||
return map; | ||
} | ||
|
||
protected int getSessionsCount(List<Map<String, Object>> nodes) { | ||
int availableSessions = 0; | ||
for (Map<String, Object> node : nodes) { | ||
List<Map<String, Object>> slots = (List<Map<String, Object>>) node.get("slots"); | ||
for (Map<String, Object> slot : slots) { | ||
if (slot.get("session") == null) { | ||
availableSessions++; | ||
} | ||
} | ||
} | ||
return availableSessions; | ||
} | ||
|
||
public class Holder { | ||
public RemoteWebDriver driver; | ||
public Long time; | ||
|
||
public void setDriver(RemoteWebDriver driver) { | ||
this.driver = driver; | ||
} | ||
|
||
public void setTime(Long time) { | ||
this.time = time; | ||
} | ||
|
||
public RemoteWebDriver getDriver() { | ||
return this.driver; | ||
} | ||
|
||
public Long getTime() { | ||
return this.time; | ||
} | ||
|
||
public Holder(RemoteWebDriver driver, Long time) { | ||
this.driver = driver; | ||
this.time = time; | ||
} | ||
} | ||
|
||
public ProtocolResponse getProtocolOutput(String url, Metadata metadata) throws Exception { | ||
RemoteWebDriver driver; | ||
while ((driver = getDriver()) == null) {} | ||
try { | ||
// This will block for the page load and any | ||
// associated AJAX requests | ||
driver.get(url); | ||
|
||
String u = driver.getCurrentUrl(); | ||
|
||
// call the filters | ||
ProtocolResponse response = filters.filter(driver, metadata); | ||
if (response != null) { | ||
return response; | ||
} | ||
|
||
// if the URL is different then we must have hit a redirection | ||
if (!u.equalsIgnoreCase(url)) { | ||
byte[] content = new byte[] {}; | ||
Metadata m = new Metadata(); | ||
m.addValue(HttpHeaders.LOCATION, u); | ||
return new ProtocolResponse(content, 307, m); | ||
} | ||
|
||
// if no filters got triggered | ||
byte[] content = driver.getPageSource().getBytes(); | ||
return new ProtocolResponse(content, 200, new Metadata()); | ||
|
||
} catch (Exception e) { | ||
if (e.getMessage() != null) { | ||
if ((e.getMessage().contains("ERR_NAME_NOT_RESOLVED") | ||
|| e.getMessage().contains("ERR_CONNECTION_REFUSED") | ||
|| e.getMessage().contains("ERR_CONNECTION_CLOSED") | ||
|| e.getMessage().contains("ERR_SSL_PROTOCOL_ERROR") | ||
|| e.getMessage().contains("ERR_CONNECTION_RESET") | ||
|| e.getMessage().contains("ERR_SSL_VERSION_OR_CIPHER_MISMATCH") | ||
|| e.getMessage().contains("ERR_ADDRESS_UNREACHABLE"))) { | ||
LOG.info( | ||
"Exception is of webpage related hence continuing with the driver and adding it back to queue"); | ||
} else { | ||
LOG.error( | ||
"Exception wile doing operation via driver url {} with driver hashcode {}" | ||
+ "with excepiton {}", | ||
url, | ||
driver.hashCode(), | ||
e); | ||
closeConnectionGracefully(driver); | ||
driver = null; | ||
} | ||
} | ||
throw new Exception(e); | ||
} finally { | ||
// finished with this driver - return it to the queue | ||
if (driver != null) driversQueue.put(new Holder(driver, System.currentTimeMillis())); | ||
} | ||
} | ||
|
||
private final RemoteWebDriver getDriver() { | ||
try { | ||
return driversQueue.take().getDriver(); | ||
} catch (Exception e) { | ||
return null; | ||
} | ||
} | ||
|
||
private void closeConnectionGracefully(RemoteWebDriver driver) { | ||
try { | ||
LOG.info("Before disposing driver : {}", driver.hashCode()); | ||
|
||
if (driver.getSessionId() != null) { | ||
if (driver.getSessionId().toString() != null) driver.quit(); | ||
} | ||
} catch (Exception e) { | ||
LOG.info("Error while closing driver", e); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters