Distributed SPARQL Evaluators
Abstract:
In an attempt to shed light on
the performances and limitations of current SPARQL
distributed evaluators, we evaluate a panel of 10
state-of-the-art implementations, which we benchmark on a
common basis, and according to metrics and requirements that
are instrumental in several use cases that we identified. We
report on a detailed comparative analysis, using aspects
such as velocity, resource consumption, reaction to data
updates, etc. Our analysis pinpoints the advantages
and limitations of current systems for the efficient
evaluation of SPARQL queries on a commodity cluster.
In order to compare various systems, we first introduce as a reminder standard metrics relevant to look only at performance :
- Loading Time: measures the time taken by the system to load a set of triples in its own store.
- Query Response Time: measures the total time spent to evaluate SPARQL queries.
- Memory Usage: measures the disk footprint for a given dataset size including indices and any auxiliary data structures.
- Network Traffic: allows to know how workers use the cluster to communicate with each other.
- CPU Load Balancing: gives an idea of which machine is working.
- Scalability: answering times should decrease when additional nodes are added to the system.
- Resiliency: measures how performance of the system is affected when one or several nodes fail.
We present here an extension of the experiments presented in our paper, i.e. results dealing with other datasets are added confirming the conclusions presented in the article.
We show here:
- The various competitors we looked at, and their configurations.
- Some details i.e. several information such as the protocol and the machines used.
- Results using large datasets: Lubm1k (more than 100 million triples) and Lubm10k (more than one billion triples)
4store
4store is a native triplestore that proposes its own storage logic. It relies on sharding data equally between each machines of the cluster. The data are indexed and written on disk. At query time, the driver parse the query and delegate the resolution to all the workers, then aggregate the intermediary results returned by the worker to produce the final result.
Installation
Execute following commands:wget http://repo.sparql.pro/centos/sparql-pro-1.0.0-1.noarch.rpm ; wget http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm ; rpm -ivh sparql-pro-1.0.0-1.noarch.rpm ; rpm -ivh epel-release-6-8.noarch.rpm ; yum install 4store ;
Configuration
- Start a worker on each machine of the cluster:
4s-boss
- Setup a triples backend on each machine of the cluster:
4s-backen-setup --cluster <numbMachines> --node <nbMachine> --segments <numSeg> <backendName>
- <numbMachine> stands for the total number of machines in the cluster.
- <nbMachine> stands for the current machine number, beginning from 0.
- <numSeg> stands for the number of segments each machine on the cluster will hold.
- <backendName> stands for the backend's name.
- On a single machine, which will be the driver, start the backend freshly created:
4s-admin start-stores <backendName>
Notes
- As stated in 4store documentation, there has to be "twice as many segments as there are physical CPUs or CPU cores on the system" (round to the nearest power of two).
In our case, we have 10 machines with 4 CPUs each. So we chose a number of segments equals to 64.
- The version of 4store we use is 1.1.5. Official website.
Experiments
CumulusRDF
CumulusRDF is a "Proof of Concept" using a HTable database, relying on Cassandra for the storage and OpenRDF Sesame as a SPARQL evaluator.
Dependencies
- Cassandra cluster (version used: 1.2.19)
- OpenRDF Sesame (embedded into client jar)
Installation
We used the version of cassandra available on Datastax
packages repository. More informations are
available here
to append this repository to CentOS repositories list.
Then, you can simply install all needed packages executing:
yum update
Finally:
yum install dsc12
Configuration
Cluter is configured to take account to the fact that the 10
virtual machines composing the cluster are spread through 2
physical machines. We declared the two groups of virtual
machines into 2 distinct racks
(see Snitches).
After that, no more configuration is needed. Just launch
triples import with:
java -cp cumulusrdf-1.0.1.jar edu.kit.aifb.cumulus.cli.Main Load -i /path/to/input/file -k <databaseName> -s triple
Note that the database should not exist. If it exists,
delete it, change its name, or create an another one. These
operations can be done using cqlsh
(documentation here).
Then, you can query the database with:
java -cp cumulusrdf-1.0.1.jar edu.kit.aifb.cumulus.cli.Main Query -q '<queryString>' -k <databaseName>
Notes
CumulusRDF CLI is available here in version 1.0.1 we used. Please note that the project moved to Github.
CouchbaseRDF
CouchbaseRDF is a "Proof of Concept" using an in-memory document database as a backend for a triplestore. Triples has to be loaded thanks to a Java application that fill the database. Views are definied to access data stored in backend.
Installation
- Retrieve RPM package:
wget https://packages.couchbase.com.s3.amazonaws.com/releases/3.0.1/couchbase-server-enterprise-3.0.1-centos6.x86_64.rpm .
rpm -i couchbase-server-enterprise-3.0.1-centos6.x86_64.rpm
Configuration
Configuration all goes through web interface. You just have to follow the steps of the detailed configuration documentation. We created a new cluster on the first machine, with the default calculated amount of RAM (60% of machine RAM). Then we added the other machines to this cluster to obtain a 10-machines cluster.
Once having a functional cluster, we had to create a bucket (or Couchabse database) to store the triples. Detailed installation documentation can be found here. The bucket has to be named default to works with the implementation on CouchbaseRDF.
Then, we have to define (and publish) 3 views to allow CouchebaseRDFLoader to insert triples into the bucket and CouchbaseRDF to query the bucket:
PredObj
function (doc, meta) { if(doc[0] && doc[1]) { for(i = 0; i < doc[0].length; i++) { if(doc[1][i].length > 200) { emit([doc[0][i], doc[1][i]], null); } } } }
Pred
function (doc, meta) { if(doc[0] && doc[1]) { for(i = 0; i < doc[0].length; i++) { emit(doc[0][i], doc[1][i]); } } }
Obj
function (doc, meta) { if(doc[0] && doc[1]) { for(i = 0; i < doc[0].length; i++) { if(doc[1][i].length > 200) { emit([doc[1][i]], doc[0][i]); } } } }
The file called importInfo, which is located into the same directory as the jar of CouchbaseRDFLoader, has to be edited to specify how to load triple into the bucket. Put the bucket name on the first line. On the second line, specify the URL of one of the machine of the cluster. On the third line, we have to specify the path to the file containing triples to import. Put some text on the fourth line if trace of execution is needed.
The file called serverInfo, which is located in the same directory of the jar of CouchbaseRDF, has to be edited to run the queries. Put the bucket name in the first line, the second line is the url of one of the servers from the cluster. The third line is for printing the results, if it is empty, no actual query results are printed, only the query times. If you put some text, then query results are printed as well as times.
To launch import or query tasks, launch execution of the corresponding jar.
java -jar CouchbaseRDFLoader.jar importInfo
java -jar CouchbaseRDF.jar serverInfo
Notes
- CouchbaseRDFLoader is used read and parse RDF input file, and insert data into database.
- CouchbaseRDF is used to query the database using SPARQL.
- Couchabse version used: 3.0.1 Community Edition (build-1444)
RYA
Rya is a scalable system for storing and retrieving RDF data in a cluster of nodes. It uses a new serialization format for storing the RDF data, an indexing method to provide fast access to data, and query processing techniques for speeding up the evaluation of SPARQL queries. These methods take advantage of the storing, sorting, and grouping of data that Apache Accumulo provides.
Dependencies
- Hadoop (+HDFS) ≥ 2
- Zookeeper 3.4.5
- Apache Accumulo ≥ 1.6
Installation
There no special installation procedure, since the loading and querying tasks are done with client applications packaged as JARs files.
Instructions
Loading is achieved with the BulkLoad tool which is using MapReduce as explained in the official documentation related to Load Data.
hadoop jar accumulo.rya-3.2.10-SNAPSHOT-shaded.jar \ mvm.rya.accumulo.mr.fileinput.BulkNtripsInputTool \ -Dac.zk=localhost:2181 \ -Dac.instance=accumulo \ -Dac.username=root \ -Dac.pwd=secret \ -Drdf.tablePrefix=triplestore_ \ -Dio.sort.mb=64 \ /path/to/triples.nt
- -Dac.zk parameter has to be replaced by coma separeted list of Zookeeper instances present in the cluster (or at least the leader one).
- -Dac.instance parameter has to be replaced by the name of your Accumulo instance's name.
- -Dac.username parameter has to be replaced by the username that is allowed to interact with your Accumulo cluster.
- -Dac.pwd parameter has to be repalced with the password corresponding to your Accumulo user.
- -Drdf.tablePrefix parameter has to be replaced by the prefix you want to use to create tables for the dataset to be uploaded.
Querying is achieved thanks to a Java program which parse queries from files, then execute them against Accumulo store (see official doccumentation related to Query Data.
Notes
- We used RYA 3.2.10-SNAPSHOT. Sources and documentation are available here.
- We used Accumulo 1.6.0-cdh5.1.4 on top of a Cloudera stack. Official website.
- Additionally, the system currently imposes some restrictions. For instance:
- The following triple pattern “?s ?p ?o .” is forbidden in the “WHERE{...}” clauses.
- There is no way to execute query number 2 of LUBM.
RDF on Apache Spark
Dependencies
- Apache Hadoop (+ HDFS) ≥ 2
- Apache Spark ≥ 1.2.1
Instructions
Copy data (N-Triples) on the HDFS:
hadoop fs -mkdir test ; hadoop fs -copyFromLocal Triples.nt test/ ;
Vertical partitioning:
spark-submit --driver-memory 4G --executor-memory 9G --class "GraphWorld" graph-project_2.10-1.0.jar test/Triples.nt test/ 2> load.log ;
Execute LUBM's set of queries:
for i in $(seq 1 14); do spark-submit --driver-memory 4G --executor-memory 9G --class "Q$i" graph-project_2.10-1.0.jar 2> Q$i.log ; done
Notes
CliqueSquare
Dependencies
- Java
- Hadoop (+HDFS) version <2 !
PigSPARQL
PigSPARQL is a SPARQL query processor for MapReduce based on Apache Pig; in other words, it compiles a SPARQL fragment to PigLatin. This system has no real loading phase: it reads its data directly from the HDFS in the N-triple W3C standard. The compilation tries to optimize the execution plan through basic writing rules. Such programs are then executed by series of MapReduce jobs.
Dependencies
- Java ≥ 1.6
- Hadoop Distributed File System and Hadoop MapReduce
- Apache Pig
Instructions
First, we translate the SPARQL query into a PigLatin script:
java -cp pigsparql-2.0.jar pigsparql.run.QueryCompiler -i ./myQuery.sparql -o ./myQuery.pig
Then, we simply execute the generated script on the MapReduce cluster using Pig:
pig -param jobName='default' -param inputData='myDataset.nt' -param outputData='out' -param reducerNum='k' myQuery.pig
Notes
- We also specify the number of reducers we use. In our case, the parameter named k in the last command is set to 40 (#nodes * #cores_per_node).
- We used PigSPARQL version 2.0. Official website.
- For more information about PigLatin visit: http://pig.apache.org/.
Team
Authors Damien Graux damien DOT graux AT inria DOT fr Louis Jachiet Pierre Genevès Nabil Layaïda |
From the following institutions: Inria Cnrs LIG UGA |
Partially funded by: Datalyse Project |