Introduction
Presto is an open source distributed SQL engine for running interactive analytic queries on top of various data sources like Hadoop, Cassandra, and Relational DBMS etc. It is developed by Facebook to query Petabytes of data with low latency using Standard SQL interface.
With the help of Presto, data from multiple sources can be accessed, combined and analysed using a single SQL query. In this article we are going to run join queries on 2 tables –one of it is present in Apache Cassandra & second is present in Hive. First we will setup presto cluster then run standard SQL queries with Presto on the data stored in Apache Cassandra and Hive. A single presto query will first fetch data from Cassandra and Hive tables then process & analyse data based on query then result of this analysis will be stored in a new Hive Table.
Prerequisites
In order to run presto queries on Hive and Cassandra tables, below components must be installed and configured.
- A working Hadoop installation (single-node or multi-node). This can be achieved by following steps which are given here.
- Hive needs to be installed and configured as Presto uses hive metastore to query hive tables using hive metastore. This can be done by following steps given here.
- Hive metastore service should be up & running. It can be started using command $HIVE_HOME/bin/hive –service metastore &
- A working Apache Cassandra installation (single-node or multi-node) as we are going to query data from Apache Cassandra table. Presto requires apache Cassandra 2.0.3 or later version to be installed.
Installing and Configuring Presto for Hive and Cassandra Catalog
Presto can be installed and configured on multiple nodes to form a cluster. A presto cluster will consist of 3 components:
- Presto Coordinator
- Presto Worker
- Discovery Server
A single presto coordinator and discovery server is present in a presto cluster. Multiple workers are present to process data in parallel and each worker runs on a separate node.
Install Presto
Both presto coordinator and workers use the same installation setup. Use below steps to install presto on coordinator and each worker node.
– Download the Presto server tarball, presto-server-0.68.tar.gz, and unpack it using command
tar -xvf presto-server-0.68.tar.gz
– Create a data directory for storing presto logs, local metadata. This directory can be created anywhere but it is recommended to create it outside presto installation directory. For example, create data directory in /var/lib/presto using command
mkdir -p /var/lib/presto/data
Note: User should have the read and write permissions on presto data directory (/var/lib/presto/data).
Configure Presto
Create an etc directory inside the $PRESTO_INSTALLATION_DIR directory on each node using below command:
mkdir etc
Note: Here $PRESTO_INSTALLATION_DIR = path of directory where presto-server-0.68 is installed.
This will hold the following configuration:
– Node Properties: environmental configuration specific to each node
– JVM Config: command line options for the Java Virtual Machine
– Config Properties: configuration for the Presto server.
– Catalog Properties: configuration for connectors (data sources)
Node Properties
On each node, create a node.properties (at location $PRESTO_INSTALLATION_DIR/etc/node.properties) file which contains configuration specific to each node. A node is a single installed instance of Presto on a machine. This file is typically created by the deployment system when Presto is first installed. The following is a minimal etc/node.properties. Add these properties to the created file.
node.environment=productionnode.id=ffffffff-ffff-ffff-ffff-ffffffffffffnode.data-dir=/var/lib/presto/data
Here node.id is the unique identifier for this installation of Presto. This must be unique for every node. This identifier should remain consistent across reboots or upgrades of Presto.
JVM Config
On each node, create a jvm.config (at location $PRESTO_INSTALLATION_DIR/etc/jvm.config) file which contains a list of command line options used for launching the Java Virtual Machine. The format of the file is a list of options, one per line.
-server-Xmx16G-XX:+UseConcMarkSweepGC-XX:+ExplicitGCInvokesConcurrent-XX:+CMSClassUnloadingEnabled-XX:+AggressiveOpts-XX:+HeapDumpOnOutOfMemoryError-XX:OnOutOfMemoryError=kill -9 %p-XX:PermSize=150M-XX:MaxPermSize=150M-XX:ReservedCodeCacheSize=150M-Xbootclasspath/p:$PRESTO_INSTALLATION_DIR/presto-server-0.68/lib/floatingdecimal-0.1.jar
Note: In above configuration, -Xmx16G representsthe max memory allocated to presto server for current node. Change this parameter according to memory present in your environment.
Config Properties
On each node, create a config properties file (at location $PRESTO_INSTALLATION_DIR/etc/config.properties) which contains the configuration for the Presto server. Every Presto server can function as both a coordinator and a worker, but dedicating a single machine to only perform coordination work provides the best performance on larger clusters. For a presto cluster, there will be one coordinator and multiple workers running each on a separate machine.
Configurations For The coordinator:
Use below configuration properties in etc/config.properties file for coordinator node.
coordinator=truedatasources=jmxhttp-server.http.port=8080presto-metastore.db.type=h2presto-metastore.db.filename=var/lib/presto/MetaStoretask.max-memory=1GBdiscovery-server.enabled=truediscovery.uri=http://example.net:8080
Note: In discovery.uri property, replace example.net with IP address of your machine where discovery service will run.
Configurations For workers:
Use below configuration properties in etc/config.properties file for each workder node.
coordinator=falsedatasources=jmx,hive,cassandrahttp-server.http.port=8080presto-metastore.db.type=h2presto-metastore.db.filename=var/lib/presto/MetaStoretask.max-memory=1GBdiscovery-server.enabled=truediscovery.uri=http://example.net:8080
Note: In discovery.uri property, replace example.net with IP address of your machine where discovery service will run.
Log Levels
The optional log levels file, log.properties, allows setting the minimum log level for named logger hierarchies. First create this file at location $PRESTO_INSTALLATION_DIR/etc/log.properties and add below properties in it.
com.facebook.presto=DEBUG
Catalog Properties
Presto accesses data via connectors, which are mounted in catalogs. The connector provides all of the schemas and tables inside of the catalog.
Catalogs are registered by creating a catalog properties file in the $PRESTO_INSTALLATION_DIR/etc/catalog directory. For example, create $PRESTO_INSTALLATION_DIR/etc/catalog /jmx.properties file with the following contents to mount the jmx connector as the jmx catalog
connector.name=jmx
Create Hive Catalog
Presto includes Hive connectors for multiple versions of Hadoop:
- hive-hadoop1: Apache Hadoop 1.x
- hive-hadoop2: Apache Hadoop 2.x
- hive-cdh4: Cloudera CDH 4
- hive-cdh5: Cloudera CDH 5
Create $PRESTO_INSTALLATION_DIR/etc/catalog/hive.properties with the following contents to mount the hive-hadoop1 connector as the hive catalog, replacing hive-hadoop1 with the proper connector for your version of Hadoop and example.net:9083 with the correct host and port for your Hive metastore Thrift service:
connector.name=hive-hadoop1hive.metastore.uri=thrift://example.net:9083
Note: Replace example.net with the IP address of node where Hive is running.
Create Cassandra Catalog
Create etc/catalog/cassandra.properties with the following contents to mount the Cassandra connector as the Cassandra catalog
connector.name=cassandra# Comma separated list of contact pointscassandra.contact-points=host1,host2# Port running the native Cassandra protocolcassandra.native-protocol-port=9042# Limit of rows to read for finding all partition keys.cassandra.limit-for-partition-key-select=100000# number of splits generated if partition keys are unknowncassandra.unpartitioned-splits=1000# maximum number of schema cache refresh threads, i.e. maximum number of parallel requestscassandra.max-schema-refresh-threads=10# schema cache time to livecassandra.schema-cache-ttl=1h# schema refresh intervalcassandra.schema-refresh-interval=2m# Consistency level used for Cassandra queries (ONE, TWO, QUORUM, …)cassandra.consistency-level=ONE# fetch size used for Cassandra queriescassandra.fetch-size=5000 # fetch size used for partition key select querycassandra.fetch-size-for-partition-key-select=20000
Note: In cassandra.contact-points property above, replace host1, host2 with the IP addresses of your machines containing Apache Cassandra installation.
Install Discovery:
Presto uses the Discovery service to find all the nodes in the cluster. Every Presto instance will register itself with the Discovery service on start-up. This service needs to be installed and configured on any one node.
Download discovery-server-1.16.tar.gz, unpack it to create the installation directory, create the data directory, and then configure it to run on a different port than Presto. The standard port for Discovery is 8411
Configure Discovery
As with Presto, create an etc directory inside the installation directory to hold the configuration files.
Node Properties
Create the Node Properties file (discovery-server-1.16/etc/node.properties) the same way as for Presto, but make sure to use a unique value for node.id. For example:
node.environment=productionnode.id=ffffffff-ffff-ffff-ffff-ffffffffffffnode.data-dir=/var/lib/presto/discovery/data
JVM Config
Create the JVM Config file the same way as for Presto, but configure it to use fewer resources:
-server-Xmx1G-XX:+UseConcMarkSweepGC-XX:+ExplicitGCInvokesConcurrent-XX:+AggressiveOpts-XX:+HeapDumpOnOutOfMemoryError-XX:OnOutOfMemoryError=kill -9 %p
Config Properties
Create etc/config.properties with the following lone option:
http-server.http.port=8411
Run Discovery
The installation directory contains the launcher script in bin/launcher. Presto can be started as a daemon by running the following:
cd DISCOVERY_INSTALLATION_DIRbin/launcher start
Run Presto
The installation directory contains the launcher script in bin/launcher. Presto can be started as a daemon by running the following:
cd $PRESTO_INSTALLATION_DIRbin/launcher start
Start presto server using above command on each server present in the cluster.
Install and Configure Presto Client Interface (Cli)
- Download presto-cli-0.68-executable.jar from here
- Rename it to presto using Linux command mv presto-cli-0.68-executable.jar presto
- Provide execute permission to presto using Linux command chmod +x presto
- Now run presto cli using below command
./presto –server localhost:8080 –catalog hive –schema default
Note: Replace localhost with IP address of presto server.
Creating Tables and Populating Data in Cassandra and Hive
Now create tables in Apache Cassandra and Hive and populate data in these tables so that we can query these tables using presto.
Create Table in Apache Cassandra
Create a table orders in apache Cassandra using CQL and insert data into it using below commands:
cqlsh> CREATE KEYSPACE demodb WITH REPLICATION = { ‘class’ : ‘NetworkTopologyStrategy’, ‘dc1’ : 1 };cqlsh> USE demodb;cqlsh> CREATE TABLE user_purchases (user_id INT, item TEXT, quanity INT, amount FLOAT, time timestamp, place TEXT, PRIMARY KEY (user_id, timestamp));
Populate Data in Cassandra Table
After table creation in Apache Cassandra, populate some data using cqlsh
cqlsh> CREATE KEYSPACE demodb WITH REPLICATION = { ‘class’ : ‘NetworkTopologyStrategy’, ‘dc1’ : 1 };cqlsh> USE demodb;cqlsh> CREATE TABLE user_purchases (user_id INT, item TEXT, quanity INT, amount FLOAT, time timestamp, place TEXT, PRIMARY KEY (user_id, timestamp));cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (1, ‘Shirt’, 2, 3050.50, 1395639405, ‘New Delhi’);cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (1, ‘Shoes’, 3, 8140.60, 1398901516, ‘Noida’); cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (2, ‘Mobile Phone’, 1, 18300.00, 1406195803, ‘Gurgaon’); cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (3, ‘Laptop’, 1, 40140.60, 1401782401, ‘New Delhi’); cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (6, ‘chocolate’, 5, 500.30, 1401782405, ‘New Delhi’); cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (6, ‘Tablet’, 1, 20460.20, 1401782291, ‘Gurgaon’); cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (10, ‘Bat’, 1, 4860.20, 1337070341, ‘Mumbai’); cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (12, ‘clothes’, 4, 16450.00, 1295781836, ‘Chennai’); cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (9, ‘Bike’, 1, 65320.00, 1384490305, ‘Mumbai’); cqlsh> INSERT INTO user_purchases (user_id, item, quanity, amount, time, place) VALUES (11, ‘Music System’, 2, 26450.00, 1370489145, ‘New Delhi’);
Create Table in Hive
Create a table user_info in hive using below command in hive cli
hive> create table user_info (id INT, fname STRING, lname STRING, age INT, salary INT, gender STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’;
Populate data in Hive table
Create a text file user_info_data.csv and add some user related data for above created hive table
1,Steven,Smith,24,42000,Male 2,Pawan,Lathwal,24,30000,Male3,Mariya,Gilbert,25,44000,Female4,Taylor,Lockwood,24,41000,Male5,Sanjiv,Singh,25,51000,Male6,Peter,Mcculum,43,191000,Male7,Geeta,Rai,23,35000,Female8,Priyanka,Sachdeva,23,34000,Female9,Sanjiv,Puri,26,78000,Male10,Sachin,Tyagi,43,250000,Male11,Adam,Gilchrist,34,180000,Male12,Monika,Chandra,24,46000,Female13,Anamika,Malhotra,26,92000,Female
Now load this data into hive table using below command:
load data local inpath ‘user_info_data.csv’ overwrite into table user_info;
Query data using Presto
After loading data in both Cassandra table and Hive table, we are going to query this data from Presto client interface (cli). With the help of Presto, a single query can be executed to get data from both of these sources- Cassandra and Hive and combine the results.
The combined results (from Cassandra & Hive) from presto query can be either streamed to client or can be saved in a new Table. Here we will try both the approaches.
Presto Query 1: Combine data from Cassandra & Hive using Presto Join Query
First login to presto cli by using below command
./presto –server localhost:8080 –catalog hive –schema default
Note: Replace localhost with the IP address of node running presto server.
Then run below query on Presto cli
presto:default> select hive_user_info.id, hive_user_info.fname, hive_user_info.age, hive_user_info.salary, cassandra_user_purchases.item, cassandra_user_purchases.time, cassandra_user_purchases.place from hive.default.user_info hive_user_info join cassandra.stockticker.user_purchases cassandra_user_purchases on hive_user_info.id = cassandra_user_purchases.user_id;
The above presto query combines data from 2 tables – user_info table present in Hive and user_purchases table in Cassandra. A join is performed on these tables on the basis of common filed user_id and matching records will be shown as a result.
Presto Query 2: Get top 2 purchases from each user by combining data from Hive & Cassandra in single Presto Query
Run below presto query which joins the data from Cassandra and Hive table & output only top 2 purchases for each user based on the purchase_amount. This query uses complex analytic function of presto like row_number(), order by clause etc.
presto:default> select * from (select *, row_number() over(partition by id order by amount desc)as rnk from (select hive_user_info.id, hive_user_info.fname, hive_user_info.gender, hive_user_info.age, hive_user_info.salary, cassandra_user_purchases.item, cassandra_user_purchases.time, cassandra_user_purchases.place, cassandra_user_purchases.quanity, cassandra_user_purchases.amount from user_info hive_user_info join cassandra.stockticker.user_purchases cassandra_user_purchases on hive_user_info.id = cassandra_user_purchases.user_id)) where rnk <=2;
Presto Query 3: Get top 2 purchases from each user and save result in a new Table
This query first finds top 2 purchases from each user based on purchase amount then stores the output results on a new table user_top_purchases which will be created as a result of this query.
CREATE TABLE user_top_purchases as select * from (select *, row_number() over(partition by id order by amount desc)as rnk from (select hive_user_info.id, hive_user_info.fname, hive_user_info.gender, hive_user_info.age, hive_user_info.salary, cassandra_user_purchases.item, cassandra_user_purchases.time, cassandra_user_purchases.place, cassandra_user_purchases.quanity, cassandra_user_purchases.amount from user_info hive_user_info join cassandra.stockticker.user_purchases cassandra_user_purchases on hive_user_info.id = cassandra_user_purchases.user_id)) where rnk <=2;