Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark patch #139

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions spark/.gitkeep
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this file could be removed

136 changes: 136 additions & 0 deletions spark/ClickBenchRunner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import org.apache.spark.sql.{SparkSession, SQLContext}
import java.sql.Statement
import org.apache.spark.sql.types._

val spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql("DROP TABLE IF EXISTS hits")

val schema = StructType(
List(
StructField("WatchID", LongType, nullable = false),
StructField("JavaEnable", ShortType, nullable = false),
StructField("Title", StringType, nullable = false),
StructField("GoodEvent", ShortType, nullable = false),
StructField("EventTime", TimestampType, nullable = false),
StructField("EventDate", DateType, nullable = false),
StructField("CounterID", IntegerType, nullable = false),
StructField("ClientIP", IntegerType, nullable = false),
StructField("RegionID", IntegerType, nullable = false),
StructField("UserID", LongType, nullable = false),
StructField("CounterClass", ShortType, nullable = false),
StructField("OS", ShortType, nullable = false),
StructField("UserAgent", ShortType, nullable = false),
StructField("URL", StringType, nullable = false),
StructField("Referer", StringType, nullable = false),
StructField("IsRefresh", ShortType, nullable = false),
StructField("RefererCategoryID", ShortType, nullable = false),
StructField("RefererRegionID", IntegerType, nullable = false),
StructField("URLCategoryID", ShortType, nullable = false),
StructField("URLRegionID", IntegerType, nullable = false),
StructField("ResolutionWidth", ShortType, nullable = false),
StructField("ResolutionHeight", ShortType, nullable = false),
StructField("ResolutionDepth", ShortType, nullable = false),
StructField("FlashMajor", ShortType, nullable = false),
StructField("FlashMinor", ShortType, nullable = false),
StructField("FlashMinor2", StringType, nullable = false),
StructField("NetMajor", ShortType, nullable = false),
StructField("NetMinor", ShortType, nullable = false),
StructField("UserAgentMajor", ShortType, nullable = false),
StructField("UserAgentMinor", StringType, nullable = false),
StructField("CookieEnable", ShortType, nullable = false),
StructField("JavascriptEnable", ShortType, nullable = false),
StructField("IsMobile", ShortType, nullable = false),
StructField("MobilePhone", ShortType, nullable = false),
StructField("MobilePhoneModel", StringType, nullable = false),
StructField("Params", StringType, nullable = false),
StructField("IPNetworkID", IntegerType, nullable = false),
StructField("TraficSourceID", ShortType, nullable = false),
StructField("SearchEngineID", ShortType, nullable = false),
StructField("SearchPhrase", StringType, nullable = false),
StructField("AdvEngineID", ShortType, nullable = false),
StructField("IsArtifical", ShortType, nullable = false),
StructField("WindowClientWidth", ShortType, nullable = false),
StructField("WindowClientHeight", ShortType, nullable = false),
StructField("ClientTimeZone", ShortType, nullable = false),
StructField("ClientEventTime", TimestampType, nullable = false),
StructField("SilverlightVersion1", ShortType, nullable = false),
StructField("SilverlightVersion2", ShortType, nullable = false),
StructField("SilverlightVersion3", IntegerType, nullable = false),
StructField("SilverlightVersion4", ShortType, nullable = false),
StructField("PageCharset", StringType, nullable = false),
StructField("CodeVersion", IntegerType, nullable = false),
StructField("IsLink", ShortType, nullable = false),
StructField("IsDownload", ShortType, nullable = false),
StructField("IsNotBounce", ShortType, nullable = false),
StructField("FUniqID", LongType, nullable = false),
StructField("OriginalURL", StringType, nullable = false),
StructField("HID", IntegerType, nullable = false),
StructField("IsOldCounter", ShortType, nullable = false),
StructField("IsEvent", ShortType, nullable = false),
StructField("IsParameter", ShortType, nullable = false),
StructField("DontCountHits", ShortType, nullable = false),
StructField("WithHash", ShortType, nullable = false),
StructField("HitColor", StringType, nullable = false),
StructField("LocalEventTime", TimestampType, nullable = false),
StructField("Age", ShortType, nullable = false),
StructField("Sex", ShortType, nullable = false),
StructField("Income", ShortType, nullable = false),
StructField("Interests", ShortType, nullable = false),
StructField("Robotness", ShortType, nullable = false),
StructField("RemoteIP", IntegerType, nullable = false),
StructField("WindowName", IntegerType, nullable = false),
StructField("OpenerName", IntegerType, nullable = false),
StructField("HistoryLength", ShortType, nullable = false),
StructField("BrowserLanguage", StringType, nullable = false),
StructField("BrowserCountry", StringType, nullable = false),
StructField("SocialNetwork", StringType, nullable = false),
StructField("SocialAction", StringType, nullable = false),
StructField("HTTPError", ShortType, nullable = false),
StructField("SendTiming", IntegerType, nullable = false),
StructField("DNSTiming", IntegerType, nullable = false),
StructField("ConnectTiming", IntegerType, nullable = false),
StructField("ResponseStartTiming", IntegerType, nullable = false),
StructField("ResponseEndTiming", IntegerType, nullable = false),
StructField("FetchTiming", IntegerType, nullable = false),
StructField("SocialSourceNetworkID", ShortType, nullable = false),
StructField("SocialSourcePage", StringType, nullable = false),
StructField("ParamPrice", LongType, nullable = false),
StructField("ParamOrderID", StringType, nullable = false),
StructField("ParamCurrency", StringType, nullable = false),
StructField("ParamCurrencyID", ShortType, nullable = false),
StructField("OpenstatServiceName", StringType, nullable = false),
StructField("OpenstatCampaignID", StringType, nullable = false),
StructField("OpenstatAdID", StringType, nullable = false),
StructField("OpenstatSourceID", StringType, nullable = false),
StructField("UTMSource", StringType, nullable = false),
StructField("UTMMedium", StringType, nullable = false),
StructField("UTMCampaign", StringType, nullable = false),
StructField("UTMContent", StringType, nullable = false),
StructField("UTMTerm", StringType, nullable = false),
StructField("FromTag", StringType, nullable = false),
StructField("HasGCLID", ShortType, nullable = false),
StructField("RefererHash", LongType, nullable = false),
StructField("URLHash", LongType, nullable = false),
StructField("CLID", IntegerType, nullable = false))
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no way to create an index, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


val startTable = System.nanoTime()
val df = spark.read.schema(schema).option("header", "false").option("sep", "\t").csv("/hits.tsv")
df.createOrReplaceTempView("hits")
val endTable = System.nanoTime()
val timeElapsedTable = (endTable - startTable) / 1000000
println(s"Creating table time: $timeElapsedTable ms")

val queries = Source.fromFile("queries.sql").getLines().toList
var itr: Int = 0
queries.foreach(query => {
val start = System.nanoTime()
val result = spark.sql(query)
val end = System.nanoTime()
val timeElapsed = (end - start) / 1000000
println(s"Query $itr | Time: $timeElapsed ms")
itr += 1
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls upload the results

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uploaded in log.txt file


spark.stop()
System.exit(0)
16 changes: 16 additions & 0 deletions spark/benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

sudo apt-get update
sudo apt-get -y install openjdk-8-jdk-headless

# For Spark3.0.1 installation:
# wget --continue https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
DoubleMindy marked this conversation as resolved.
Show resolved Hide resolved
# tar -xzf spark-3.0.1-bin-hadoop2.7.tgz
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tar -xzf spark*

# mv spark-3.0.1-bin-hadoop2.7 spark

wget --continue 'https://datasets.clickhouse.com/hits_compatible/hits.tsv.gz'
#gzip -d hits.tsv.gz
chmod 777 ~ hits.tsv
$HADOOP_HOME/bin/hdfs dfs -put hits.tsv /
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how do I set this variable?

$ echo $HADOOP_HOME

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find it:

find spark-3.5.0-bin-hadoop3 -name hdfs


$SPARK_HOME/bin/spark-shell --master local -i ClickBenchRunner.scala > log.txt
43 changes: 43 additions & 0 deletions spark/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
SELECT COUNT(*) FROM hits;
SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0;
SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits;
SELECT AVG(UserID) FROM hits;
SELECT COUNT(DISTINCT UserID) FROM hits;
SELECT COUNT(DISTINCT SearchPhrase) FROM hits;
SELECT MIN(EventDate), MAX(EventDate) FROM hits;
SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC;
SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10;
SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10;
SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10;
SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID FROM hits WHERE UserID = 435090932899640449;
SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%';
SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10;
SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10;
SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits;
SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10;
SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10;
SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10;
SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10;
SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10;
SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10;
SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10;
SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the OFFSET clause was removed, which is incorrect.

It should be either LIMIT 1010 to get the closest result or subqueries with ROW_NUMBER.