-
Notifications
You must be signed in to change notification settings - Fork 621
/
Copy pathbenchmark_spark.py
120 lines (96 loc) · 3.59 KB
/
benchmark_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Compile spark with native blas support:
# https://github.com/Mega-DatA-Lab/SpectralLDA-Spark/wiki/Compile-Spark-with-Native-BLAS-LAPACK-Support
import argparse
import json
import time
import matplotlib.pyplot as plt
import numpy
import scipy.io
import seaborn
from pyspark import SparkConf, SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row, SparkSession
import implicit
def convert_sparse_to_dataframe(spark, context, sparse_matrix):
"""Converts a scipy sparse matrix to a spark dataframe"""
m = sparse_matrix.tocoo()
data = context.parallelize(numpy.array([m.row, m.col, m.data]).T, numSlices=len(m.row) / 1024)
return spark.createDataFrame(
data.map(lambda p: Row(row=int(p[0]), col=int(p[1]), data=float(p[2])))
)
def benchmark_spark(ratings, factors, iterations=5):
conf = (
SparkConf()
.setAppName("implicit_benchmark")
.setMaster("local[*]")
.set("spark.driver.memory", "16G")
)
context = SparkContext(conf=conf)
spark = SparkSession(context)
times = {}
try:
ratings = convert_sparse_to_dataframe(spark, context, ratings)
for rank in factors:
als = ALS(
rank=rank,
maxIter=iterations,
alpha=1,
implicitPrefs=True,
userCol="row",
itemCol="col",
ratingCol="data",
)
start = time.time()
als.fit(ratings)
elapsed = time.time() - start
times[rank] = elapsed / iterations
print(f"spark. factors={rank} took {elapsed / iterations:.3f}")
finally:
spark.stop()
return times
def benchmark_implicit(ratings, factors, iterations=5, use_gpu=False):
ratings = ratings.tocsr()
times = {}
for rank in factors:
model = implicit.als.AlternatingLeastSquares(
factors=rank, iterations=iterations, use_gpu=use_gpu
)
start = time.time()
model.fit(ratings)
elapsed = time.time() - start
# take average time over iterations to be consistent with spark timings
times[rank] = elapsed / iterations
print(f"implicit. factors={rank} took {elapsed / iterations:.3f}")
return times
def generate_graph(times, factors, filename="spark_speed.png"):
seaborn.set()
_, ax = plt.subplots()
for key in times:
current = [times[key][f] for f in factors]
ax.plot(factors, current, marker="o", markersize=6)
ax.text(factors[-1] + 5, current[-1], key, fontsize=10)
ax.set_ylabel("Seconds per Iteration")
ax.set_xlabel("Factors")
plt.savefig(filename, bbox_inches="tight", dpi=300)
def main():
parser = argparse.ArgumentParser(
description="Benchmark Spark against implicit",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--input", type=str, required=True, help="dataset file in matrix market format"
)
parser.add_argument("--output", type=str, required=True, help="output file location")
args = parser.parse_args()
m = scipy.io.mmread(args.input)
times = {}
factors = list(range(64, 257, 64))
times["Implicit (GPU)"] = benchmark_implicit(m, factors, use_gpu=True)
times["Spark MLlib"] = benchmark_spark(m, factors)
times["Implicit (CPU)"] = benchmark_implicit(m, factors, use_gpu=False)
print(times)
generate_graph(times, factors, filename=args.output + ".png")
with open(args.output + ".json", "w", encoding="utf8") as o:
json.dump(times, o)
if __name__ == "__main__":
main()