Distributed SPARQL Evaluators

Abstract:
In an attempt to shed light on the performances and limitations of current SPARQL distributed evaluators, we evaluate a panel of 10 state-of-the-art implementations, which we benchmark on a common basis, and according to metrics and requirements that are instrumental in several use cases that we identified. We report on a detailed comparative analysis, using aspects such as velocity, resource consumption, reaction to data updates, etc. Our analysis pinpoints the advantages and limitations of current systems for the efficient evaluation of SPARQL queries on a commodity cluster.

Considered Metrics:
In order to compare various systems, we first introduce as a reminder standard metrics relevant to look only at performance :
  1. Loading Time: measures the time taken by the system to load a set of triples in its own store.
  2. Query Response Time: measures the total time spent to evaluate SPARQL queries.
To push evaluation investigations further, we observe specific aspects that can help to analyze the cluster status during tests:
  1. Memory Usage: measures the disk footprint for a given dataset size including indices and any auxiliary data structures.
  2. Network Traffic: allows to know how workers use the cluster to communicate with each other.
  3. CPU Load Balancing: gives an idea of which machine is working.
Finally, we add broader criteria that try to approach real use cases:
  1. Scalability: answering times should decrease when additional nodes are added to the system.
  2. Resiliency: measures how performance of the system is affected when one or several nodes fail.
Overview:
We present here an extension of the experiments presented in our paper, i.e. results dealing with other datasets are added confirming the conclusions presented in the article.
We show here:

4store

4store is a native triplestore that proposes its own storage logic. It relies on sharding data equally between each machines of the cluster. The data are indexed and written on disk. At query time, the driver parse the query and delegate the resolution to all the workers, then aggregate the intermediary results returned by the worker to produce the final result.

Installation

Execute following commands:
wget http://repo.sparql.pro/centos/sparql-pro-1.0.0-1.noarch.rpm ;
wget http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm ;
rpm -ivh sparql-pro-1.0.0-1.noarch.rpm ;
rpm -ivh epel-release-6-8.noarch.rpm ;
yum install 4store ;

Configuration

  • Start a worker on each machine of the cluster:
    4s-boss
  • Setup a triples backend on each machine of the cluster:
    4s-backen-setup --cluster <numbMachines> --node <nbMachine> --segments <numSeg> <backendName>
    • <numbMachine> stands for the total number of machines in the cluster.
    • <nbMachine> stands for the current machine number, beginning from 0.
    • <numSeg> stands for the number of segments each machine on the cluster will hold.
    • <backendName> stands for the backend's name.
  • On a single machine, which will be the driver, start the backend freshly created:
    4s-admin start-stores <backendName>

Notes

  • As stated in 4store documentation, there has to be "twice as many segments as there are physical CPUs or CPU cores on the system" (round to the nearest power of two). In our case, we have 10 machines with 4 CPUs each. So we chose a number of segments equals to 64.
  • The version of 4store we use is 1.1.5. Official website.

Experiments




CumulusRDF

CumulusRDF is a "Proof of Concept" using a HTable database, relying on Cassandra for the storage and OpenRDF Sesame as a SPARQL evaluator.

Dependencies

  • Cassandra cluster (version used: 1.2.19)
  • OpenRDF Sesame (embedded into client jar)

Installation

We used the version of cassandra available on Datastax packages repository. More informations are available here to append this repository to CentOS repositories list.
Then, you can simply install all needed packages executing:

yum update

Finally:

yum install dsc12

Configuration

Cluter is configured to take account to the fact that the 10 virtual machines composing the cluster are spread through 2 physical machines. We declared the two groups of virtual machines into 2 distinct racks (see Snitches).
After that, no more configuration is needed. Just launch triples import with:

java -cp cumulusrdf-1.0.1.jar edu.kit.aifb.cumulus.cli.Main Load -i /path/to/input/file -k <databaseName> -s triple

Note that the database should not exist. If it exists, delete it, change its name, or create an another one. These operations can be done using cqlsh (documentation here).
Then, you can query the database with:

java -cp cumulusrdf-1.0.1.jar edu.kit.aifb.cumulus.cli.Main Query -q '<queryString>' -k <databaseName>

Notes

CumulusRDF CLI is available here in version 1.0.1 we used. Please note that the project moved to Github.



CouchbaseRDF

CouchbaseRDF is a "Proof of Concept" using an in-memory document database as a backend for a triplestore. Triples has to be loaded thanks to a Java application that fill the database. Views are definied to access data stored in backend.

Installation

  • Retrieve RPM package:
  • wget https://packages.couchbase.com.s3.amazonaws.com/releases/3.0.1/couchbase-server-enterprise-3.0.1-centos6.x86_64.rpm .
  • Install RPM package on each machine of the cluster.
  • rpm -i couchbase-server-enterprise-3.0.1-centos6.x86_64.rpm

Configuration

Configuration all goes through web interface. You just have to follow the steps of the detailed configuration documentation. We created a new cluster on the first machine, with the default calculated amount of RAM (60% of machine RAM). Then we added the other machines to this cluster to obtain a 10-machines cluster.

Once having a functional cluster, we had to create a bucket (or Couchabse database) to store the triples. Detailed installation documentation can be found here. The bucket has to be named default to works with the implementation on CouchbaseRDF.

Then, we have to define (and publish) 3 views to allow CouchebaseRDFLoader to insert triples into the bucket and CouchbaseRDF to query the bucket:

PredObj

function (doc, meta) {
  if(doc[0] && doc[1]) {
    for(i = 0; i < doc[0].length; i++) {
      if(doc[1][i].length > 200) {
        emit([doc[0][i], doc[1][i]], null);
      }
    }
  }
}

Pred

function (doc, meta) {
  if(doc[0] && doc[1]) {
    for(i = 0; i < doc[0].length; i++) {
      emit(doc[0][i], doc[1][i]);
    }
  }
}

Obj

function (doc, meta) {
  if(doc[0] && doc[1]) {
    for(i = 0; i < doc[0].length; i++) {
      if(doc[1][i].length > 200) {
        emit([doc[1][i]], doc[0][i]);
      }
    }
  }
}

The file called importInfo, which is located into the same directory as the jar of CouchbaseRDFLoader, has to be edited to specify how to load triple into the bucket. Put the bucket name on the first line. On the second line, specify the URL of one of the machine of the cluster. On the third line, we have to specify the path to the file containing triples to import. Put some text on the fourth line if trace of execution is needed.

The file called serverInfo, which is located in the same directory of the jar of CouchbaseRDF, has to be edited to run the queries. Put the bucket name in the first line, the second line is the url of one of the servers from the cluster. The third line is for printing the results, if it is empty, no actual query results are printed, only the query times. If you put some text, then query results are printed as well as times.

To launch import or query tasks, launch execution of the corresponding jar.

java -jar CouchbaseRDFLoader.jar importInfo
java -jar CouchbaseRDF.jar serverInfo

Notes



RYA

Rya is a scalable system for storing and retrieving RDF data in a cluster of nodes. It uses a new serialization format for storing the RDF data, an indexing method to provide fast access to data, and query processing techniques for speeding up the evaluation of SPARQL queries. These methods take advantage of the storing, sorting, and grouping of data that Apache Accumulo provides.

Dependencies

  • Hadoop (+HDFS) ≥ 2
  • Zookeeper 3.4.5
  • Apache Accumulo ≥ 1.6

Installation

There no special installation procedure, since the loading and querying tasks are done with client applications packaged as JARs files.

Instructions

Loading is achieved with the BulkLoad tool which is using MapReduce as explained in the official documentation related to Load Data.

hadoop jar accumulo.rya-3.2.10-SNAPSHOT-shaded.jar \
   mvm.rya.accumulo.mr.fileinput.BulkNtripsInputTool \
   -Dac.zk=localhost:2181 \
   -Dac.instance=accumulo \
   -Dac.username=root \
   -Dac.pwd=secret \
   -Drdf.tablePrefix=triplestore_ \
   -Dio.sort.mb=64 \
   /path/to/triples.nt
  • -Dac.zk parameter has to be replaced by coma separeted list of Zookeeper instances present in the cluster (or at least the leader one).
  • -Dac.instance parameter has to be replaced by the name of your Accumulo instance's name.
  • -Dac.username parameter has to be replaced by the username that is allowed to interact with your Accumulo cluster.
  • -Dac.pwd parameter has to be repalced with the password corresponding to your Accumulo user.
  • -Drdf.tablePrefix parameter has to be replaced by the prefix you want to use to create tables for the dataset to be uploaded.

Querying is achieved thanks to a Java program which parse queries from files, then execute them against Accumulo store (see official doccumentation related to Query Data.

Notes

  • We used RYA 3.2.10-SNAPSHOT. Sources and documentation are available here.
  • We used Accumulo 1.6.0-cdh5.1.4 on top of a Cloudera stack. Official website.
  • Additionally, the system currently imposes some restrictions. For instance:
    • The following triple pattern “?s ?p ?o .” is forbidden in the “WHERE{...}” clauses.
    • There is no way to execute query number 2 of LUBM.




RDF on Apache Spark

Dependencies

  • Apache Hadoop (+ HDFS) ≥ 2
  • Apache Spark ≥ 1.2.1

Instructions

Copy data (N-Triples) on the HDFS:

hadoop fs -mkdir test ;
hadoop fs -copyFromLocal Triples.nt test/ ;

Vertical partitioning:

spark-submit --driver-memory 4G --executor-memory 9G --class "GraphWorld" graph-project_2.10-1.0.jar test/Triples.nt test/ 2> load.log ;

Execute LUBM's set of queries:

for i in $(seq 1 14); do
   spark-submit --driver-memory 4G --executor-memory 9G --class "Q$i" graph-project_2.10-1.0.jar 2> Q$i.log ;
done

Notes








CliqueSquare

Dependencies

  • Java
  • Hadoop (+HDFS) version <2 !




PigSPARQL

PigSPARQL is a SPARQL query processor for MapReduce based on Apache Pig; in other words, it compiles a SPARQL fragment to PigLatin. This system has no real loading phase: it reads its data directly from the HDFS in the N-triple W3C standard. The compilation tries to optimize the execution plan through basic writing rules. Such programs are then executed by series of MapReduce jobs.

Dependencies

  • Java ≥ 1.6
  • Hadoop Distributed File System and Hadoop MapReduce
  • Apache Pig

Instructions

First, we translate the SPARQL query into a PigLatin script:

java -cp pigsparql-2.0.jar pigsparql.run.QueryCompiler -i ./myQuery.sparql -o ./myQuery.pig

Then, we simply execute the generated script on the MapReduce cluster using Pig:

pig -param jobName='default' -param inputData='myDataset.nt' -param outputData='out' -param reducerNum='k' myQuery.pig

Notes

  • We also specify the number of reducers we use. In our case, the parameter named k in the last command is set to 40 (#nodes * #cores_per_node).
  • We used PigSPARQL version 2.0. Official website.
  • For more information about PigLatin visit: http://pig.apache.org/.









Team

Authors
Damien Graux
damien DOT graux AT inria DOT fr

Louis Jachiet
Pierre Genevès
Nabil Layaïda
From the following institutions:
Inria
Cnrs
LIG
UGA
Partially funded by:
Datalyse Project