Friday, 2 August 2013

Using Apache Lucene to search text

Using Apache Lucene to search text:



Lucene is an open source, highly scalable text search-engine library available from the Apache Software Foundation. You can use Lucene in commercial and open source applications. Lucene's powerful APIs focus mainly on text indexing and searching. It can be used to build search capabilities for applications such as e-mail clients, mailing lists, Web searches, database search, etc. Web sites like Wikipedia, TheServerSide, jGuru, and LinkedIn have been powered by Lucene.
Lucene also provides search capabilities for the Eclipse IDE, Nutch (the famous open source Web search engine), and companies such as IBM®, AOL, and Hewlett-Packard. Lucene has been ported to many other programming languages, including Perl, Python, C++, and .NET. As of 30 Jul 2009, the latest version of Lucene in the Java™ programming language is V2.4.1.
Lucene has many features. It:
  • Has powerful, accurate, and efficient search algorithms.
  • Calculates a score for each document that matches a given query and returns the most relevant documents ranked by the scores.
  • Supports many powerful query types, such as PhraseQuery, WildcardQuery, RangeQuery, FuzzyQuery, BooleanQuery, and more.
  • Supports parsing of human-entered rich query expressions.
  • Allows users to extend the searching behavior using custom sorting, filtering, and query expression parsing.
  • Uses a file-based locking mechanism to prevent concurrent index modifications.
  • Allows searching and indexing simultaneously.
As shown in Figure 1, building a full-featured search application using Lucene primarily involves indexing data, searching data, and displaying search results.

Figure 1. Steps in building applications using Lucene

Steps in building applications using Lucene 


This article uses code snippets from a sample application developed in Java technology using Lucene V2.4.1. The example application indexes a set of e-mail documents stored in properties files and shows how to use Lucene's query APIs to search an index. The example will also familiarize you with basic index operations.
Lucene lets you index any data available in textual format. Lucene can be used with almost any data source as long as textual information can be extracted from it. You can use Lucene to index and search data stored in HTML documents, Microsoft® Word documents, PDF files, and more. The first step in indexing data is to make it available in simple text format. You can do this using custom parsers and data converters.
Indexing is a process of converting text data into a format that facilitates rapid searching. A simple analogy is an index you would find at the end of a book: That index points you to the location of topics that appear in the book.
Lucene stores the input data in a data structure called an inverted index, which is stored on the file system or memory as a set of index files. Most Web search engines use an inverted index. It lets users perform fast keyword look-ups and finds the documents that match a given query. Before the text data is added to the index, it is processed by an analyzer (using an analysis process).
Analysis is converting the text data into a fundamental unit of searching, which is called as term. During analysis, the text data goes through multiple operations: extracting the words, removing common words, ignoring punctuation, reducing words to root form, changing words to lowercase, etc. Analysis happens just before indexing and query parsing. Analysis converts text data into tokens, and these tokens are added as terms in the Lucene index.
Lucene comes with various built-in analyzers, such as SimpleAnalyzer, StandardAnalyzer, StopAnalyzer, SnowballAnalyzer, and more. These differ in the way they tokenize the text and apply filters. As analysis removes words before indexing, it decreases index size, but it can have a negative effect on precision query processing. You can have more control over the analysis process by creating custom analyzers using basic building blocks provided by Lucene. Table 1 shows some of the built-in analyzers and the way they process data.

Table 1. Lucene's built-in analyzers

AnalyzerOperations done on the text data
WhitespaceAnalyzerSplits tokens at whitespace
SimpleAnalyzerDivides text at non-letter characters and puts text in lowercase
StopAnalyzerRemoves stop words (not useful for searching) and puts text in lowercase
StandardAnalyzerTokenizes text based on a sophisticated grammar that recognizes: e-mail addresses; acronyms; Chinese, Japanese, and Korean characters; alphanumerics; and more
Puts text in lowercase
Removes stop words
Directory
An abstract class that represents the location where index files are stored. There are primarily two subclasses commonly used:
  • FSDirectory — An implementation of Directory that stores indexes in the actual file system. This is useful for large indices.
  • RAMDirectory — An implementation that stores all the indices in the memory. This is suitable for smaller indices that can be fully loaded in memory and destroyed when the application terminates. As the index is held in memory, it is comparatively faster.
Analyzer
As discussed, the analyzers are responsible for preprocessing the text data and converting it into tokens stored in the index. IndexWriter accepts an analyzer used to tokenize data before it is indexed. To index text properly, you should use an analyzer that's appropriate for the language of the text that needs to be indexed.
Default analyzers work well for the English language. There are several other analyzers in the Lucene sandbox, including those for Chinese, Japanese, and Korean.
IndexDeletionPolicy
An interface used to implement a policy to customize deletion of stale commits from the index directory. The default deletion policy is KeepOnlyLastCommitDeletionPolicy, which keeps only the most recent commits and immediately removes all prior commits after a new commit is done.
IndexWriter
A class that either creates or maintains an index. Its constructor accepts a Boolean that determines whether a new index is created or whether an existing index is opened. It provides methods to add, delete, or update documents in the index.
The changes made to the index are initially buffered in the memory and periodically flushed to the index directory.IndexWriter exposes several fields that control how indices are buffered in the memory and written to disk. Changes made to the index are not visible to IndexReader unless the commit or close method of IndexWriter are called.IndexWriter creates a lock file for the directory to prevent index corruption by simultaneous index updates.IndexWriter lets users specify an optional index deletion policy.
//Create instance of Directory where index files will be stored
Directory fsDirectory =  FSDirectory.getDirectory(indexDirectory);
/* Create instance of analyzer, which will be used to tokenize
the input data */
Analyzer standardAnalyzer = new StandardAnalyzer();
//Create a new index
boolean create = true;
//Create the instance of deletion policy
IndexDeletionPolicy deletionPolicy = new KeepOnlyLastCommitDeletionPolicy(); 
indexWriter =new IndexWriter(fsDirectory,standardAnalyzer,create,
 deletionPolicy,IndexWriter.MaxFieldLength.UNLIMITED);
  
There are two classes involved in adding text data to the index.
Field represents a piece of data queried or retrieved in a search. The Field class encapsulates a field name and its value. Lucene provides options to specify if a field needs to be indexed or analyzed and if its value needs to be stored. These options can be passed while creating a field instance. The table below shows the details of Field metadata options.

Table 2. Details of Field metadata options

OptionDescription
Field.Store.YesUsed to store the value of fields. Suitable for fields displayed with search results — file path and URL, for example.
Field.Store.NoField value is not stored — e-mail message body, for example.
Field.Index.NoSuitable for the fields that are not searched — often used with stored fields, such as file path.
Field.Index.ANALYZEDUsed for fields indexed and analyzed — e-mail message body and subject, for example.
Field.Index.NOT_ANALYZEDUsed for fields that are indexed but not analyzed. It preserves a field's original value in its entirety — dates and personal names, for example.
And a Document is a collection of fields. Lucene also supports boosting documents and fields, which is a useful feature if you want to give importance to some of the indexed data. Indexing a text file involves wrapping the text data in fields, creating a document, populating it with fields, and adding the document to the index using IndexWriter.
Listing 2 shows an example of adding data to an index.
/*Step 1. Prepare the data for indexing. Extract the data. */

String sender = properties.getProperty("sender");
String date = properties.getProperty("date");
String subject = properties.getProperty("subject");
String message = properties.getProperty("message");
String emaildoc = file.getAbsolutePath();

/* Step 2. Wrap the data in the Fields and add them to a Document */

Field senderField =
 new Field("sender",sender,Field.Store.YES,Field.Index.NOT_ANALYZED);
Field emaildatefield = 
  new Field("date",date,Field.Store.NO,Field.Index.NOT_ANALYZED); 
Field subjectField = 
 new Field("subject",subject,Field.Store.YES,Field.Index.ANALYZED);
Field messagefield = 
   new Field("message",message,Field.Store.NO,Field.Index.ANALYZED);
Field emailDocField =
 new Field("emailDoc",emaildoc,Field.Store.YES,
      Field.Index.NO);

Document doc = new Document();
// Add these fields to a Lucene Document
doc.add(senderField);
doc.add(emaildatefield);
doc.add(subjectField);
doc.add(messagefield);
doc.add(emailDocField);

//Step 3: Add this document to Lucene Index.
indexWriter.addDocument(doc);
      


Searching is the process of looking for words in the index and finding the documents that contain those words. Building search capabilities using Lucene's search API is a straightforward and easy process. This section discusses the primary classes from the Lucene search API.
Searcher is an abstract base class that has various overloaded search methods. IndexSearcher is a commonly used subclass that allows searching indices stored in a given directory. The Search method returns an ordered collection of documents ranked by computed scores. Lucene calculates a score for each of the documents that match a given query. IndexSearcher is thread-safe; a single instance can be used by multiple threads concurrently.
Term is the most fundamental unit for searching. It's composed of two elements: the text of the word and the name of the field in which the text occurs. Term objects are also involved in indexing, but they are created by Lucene internals.
Query is an abstract base class for queries. Searching for a specified word or phrase involves wrapping them in a term, adding the terms to a query object, and passing this query object to IndexSearcher's search method.
Lucene comes with various types of concrete query implementations, such as TermQuery, BooleanQuery, PhraseQuery, PrefixQuery, RangeQuery, MultiTermQuery, FilteredQuery, SpanQuery, etc. The section below discusses primary query classes from Lucene's query API.
TermQuery
The most basic query type for searching an index. TermQuery can be constructed using a single term. The term value should be case-sensitive, but this is not entirely true. It is important to note that the terms passed for searching should be consistent with the terms produced by the analysis of documents, because analyzers perform many operations on the original text before building an index.
For example, consider the e-mail subject "Job openings for Java Professionals at Bangalore." Assume you indexed this using the StandardAnalyzer. Now if we search for "Java" using TermQuery, it would not return anything as this text would have been normalized and put in lowercase by the StandardAnalyzer. If we search for the lowercase word "java," it would return all the mail that contains this word in the subject field.
//Search mails having the word "java" in the subject field
Searcher indexSearcher = new IndexSearcher(indexDirectory);
Term term = new Term("subject","java");
Query termQuery = new TermQuery(term);  
TopDocs topDocs = indexSearcher.search(termQuery,10); 
RangeQuery
You can search within a range using RangeQuery. All the terms are arranged lexicographically in the index. Lucene'sRangeQuery lets users search terms within a range. The range can be specified using a starting term and an ending term, which may be either included or excluded.
/* RangeQuery example:Search mails from 01/06/2009 to 6/06/2009 
both inclusive */
Term begin = new Term("date","20090601");
Term end = new Term("date","20090606");
Query query = new RangeQuery(begin, end, true);
PrefixQuery
You can search by a prefixed word with PrefixQuery, which is used to construct a query that matches the documents containing terms that start with a specified word prefix.
//Search mails having sender field prefixed by the word 'job'
PrefixQuery prefixQuery = new PrefixQuery(new Term("sender","job"));
PrefixQuery query = new PrefixQuery(new Term("sender","job"));
          
BooleanQuery
You can construct powerful queries by combining any number of query objects using BooleanQuery. It uses query and a clause associated with a query that indicates if a query should occur, must occur, or must not occur. In a BooleanQuery, the maximum number of clauses is restricted to 1,024 by default. You can set the maximum classes by calling thesetMaxClauseCount method.
// Search mails have both 'java' and 'bangalore' in the subject field
Query query1 = new TermQuery(new Term("subject","java"));
Query query2 = new TermQuery(new Term("subject","bangalore"));
BooleanQuery query = new BooleanQuery();
query.add(query1,BooleanClause.Occur.MUST);
query.add(query2,BooleanClause.Occur.MUST);
              
PhraseQuery
You can search by phrase using PhraseQuery. A PhraseQuery matches documents containing a particular sequence of terms. PhraseQuery uses positional information of the term that is stored in an index. The distance between the terms that are considered to be matched is called slop. By default the value of slop is zero, and it can be set by calling thesetSlop method. PhraseQuery also supports multiple term phrases.
/* PhraseQuery example: Search mails that have phrase 'job opening j2ee'
   in the subject field.*/
PhraseQuery query = new PhraseQuery();
query.setSlop(1);
query.add(new Term("subject","job"));
query.add(new Term("subject","opening"));
query.add(new Term("subject","j2ee"));
          
WildcardQuery
WildcardQuery implements a wild-card search query, which lets you do searches such as arch* (letting you find documents containing architect, architecture, etc.). Two standard wild cards are used:
  • * for zero or more
  • ? for one or more
There could be a performance drop if you try to search using a pattern in the beginning of a wild-card query, as all the terms in the index will be queried to find matching documents.
//Search for 'arch*' to find e-mail messages that have word 'architect' in the subject
field./
Query query = new WildcardQuery(new Term("subject","arch*"));
FuzzyQuery
You can search for similar terms with FuzzyQuery, which matches words that are similar to your specified word. The similarity measurement is based on the Levenshtein (edit distance) algorithm. In Listing 9, FuzzyQuery is used to find a close match of a misspelled word "admnistrtor," though this word is not indexed.
/* Search for emails that have word similar to 'admnistrtor' in the
subject field. Note we have misspelled admnistrtor here.*/
Query query = new FuzzyQuery(new Term("subject", "admnistrtor"));
QueryParser
QueryParser is useful for parsing human-entered query strings. You can use it to parse user-entered query expressions into a Lucene query object, which can be passed to IndexSearcher's search method. It can parse rich query expressions. QueryParser internally converts a human-entered query string into one of the concrete query subclasses. You need to escape special characters such as *? with a backslash (\). You can construct Boolean queries textually using the operators ANDOR, and NOT.
QueryParser queryParser = new QueryParser("subject",new StandardAnalyzer());
// Search for emails that contain the words 'job openings' and '.net' and 'pune'
Query query = queryParser.parse("job openings AND .net AND pune");
IndexSearcher returns an array of references to ranked search results, such as documents that match a given query. You can decide the number of top search results that need to be retrieved by specifying it in the IndexSearcher's search method. Customized paging can be built on top of this. You can add a custom Web application or desktop application to display search results. Primary classes involved in retrieving the search results are ScoreDoc and TopDocs.
ScoreDoc
A simple pointer to a document contained in the search results. This encapsulates the position of a document in the index and the score computed by Lucene.
TopDocs
Encapsulates the total number of search results and an array of ScoreDoc.
The code snippet below shows how to retrieve documents contained in the search results.

Listing 11. Displaying search results 

/* First parameter is the query to be executed and 
   second parameter indicates the no of search results to fetch */
   TopDocs topDocs = indexSearcher.search(query,20); 
   System.out.println("Total hits "+topDocs.totalHits);

   // Get an array of references to matched documents
   ScoreDoc[] scoreDosArray = topDocs.scoreDocs; 
   for(ScoreDoc scoredoc: scoreDosArray){
      //Retrieve the matched document and show relevant details
      Document doc = indexSearcher.doc(scoredoc.doc);
      System.out.println("\nSender: "+doc.getField("sender").stringValue());
      System.out.println("Subject: "+doc.getField("subject").stringValue());
      System.out.println("Email file location: "
     +doc.getField("emailDoc").stringValue()); 
   }


Basic index operations include removing and boosting documents.
Applications often need to update the index with the latest data and remove older data. For example, in the case of Web search engines, the index needs to be updated regularly as new Web pages get added and non-existent Web pages need to be removed. Lucene provides the IndexReader interface that lets you perform these operations on an index.
IndexReader is an abstract class that provides various methods to access the index. Lucene internally refers to documents with document numbers that can change as the documents are added to or deleted from the index. The document number is used to access a document in the index. IndexReader cannot be used to update indices in a directory for which IndexWriter is already opened. IndexReader always searches the snapshot of the index when it is opened. Any changes to the index are not visible untilIndexReader is reopened. It is important that applications using Lucene reopen their IndexReaders to see the latest index updates.

Listing 12. Deleting documents from index

// Delete all the mails from the index received in May 2009.
IndexReader indexReader = IndexReader.open(indexDirectory);
indexReader.deleteDocuments(new Term("month","05"));
//close associate index files and save deletions to disk
indexReader.close();


Sometimes you might want to give more importance to some of the indexed data. You can do so by setting a boost factor for a document or a field. By default, all the documents and fields have the same default boost factor of 1.0.

Listing 13. Boosting fields

if(subject.toLowerCase().indexOf("pune") != -1){
// Display search results that contain pune in their subject first by setting boost factor
 subjectField.setBoost(2.2F);
}
//Display search results that contain 'job' in their sender email address
if(sender.toLowerCase().indexOf("job")!=-1){ 
 luceneDocument.setBoost(2.1F);
}


Lucene provides the advanced feature called sorting. You can sort search results by fields that indicate the relative position of the documents in the index. The field used for sorting must be indexed but not tokenized. There are four possible kinds of term values that may be put into sorting fields: integers, longs, floats, or strings.
Search results can also be sorted by index order. Lucene sorts the results by decreasing relevance, such as computed score by default. Sorting order can also be changed.

Listing 14. Sorting search results

/* Search mails having the word 'job' in subject and return results
   sorted by sender's email in descending order.
 */
SortField sortField = new SortField("sender", true); 
Sort sortBySender = new Sort(sortField);
WildcardQuery query = new WildcardQuery(new Term("subject","job*"));
TopFieldDocs topFieldDocs = 
   indexSearcher.search(query,null,20,sortBySender);
//Sorting by index order
topFieldDocs = indexSearcher.search(query,null,20,Sort.INDEXORDER);
      


Filtering is a process that constrains the search space and allows only a subset of documents to be considered for search hits. You can use this feature to implement search-within-search results, or to implement security on top of search results. Lucene comes with various built-in filters such as BooleanFilter, CachingWrapperFilter, ChainedFilter, DuplicateFilter, PrefixFilter, QueryWrapperFilter, RangeFilter, RemoteCachingWrapperFilter, SpanFilter, etc. Filter can be passed to IndexSearcher's search method to filter documents that match the filter criteria.

Listing 15. Filtering search results

/*Filter the results to show only mails that have sender field 
prefixed with 'jobs' */
Term prefix = new Term("sender","jobs");
Filter prefixFilter = new PrefixFilter(prefix);
WildcardQuery query = new WildcardQuery(new Term("subject","job*"));
indexSearcher.search(query,prefixFilter,20);


Lucene, a very popular open source search library from Apache, provides powerful indexing and searching capabilities for applications. It provides a simple and easy-to-use API that requires minimal understanding of the internals of indexing and searching. In this article, you learned about Lucene architecture and its core APIs.
Lucene has powered various search applications being used by many well-known Web sites and organizations. It has been ported to many other programming languages. Lucene has a large and active technical user community. If you're looking for an easy-to-use, scalable, and high performing open-source search library, Apache Lucene is a great choice.

Friday, 31 May 2013

SQL is what’s next for Hadoop: Here’s who’s doing it

SQL is what’s next for Hadoop: Here’s who’s doing it:
When we first began putting together the schedule for Structure: Data several months ago, we knew that running SQL queries on Hadoop would be a big deal — we just didn’t know how big a deal it would actually become. Fast-forward to today, a mere month away from the event (March 20-21 in New York), and the writing on the wall is a lot clearer. SQL support isn’t the end-game for Hadoop, but it’s the feature that will help Hadoop find its way into more places in more companies that understand the importance of next-generation analytics but don’t want to (or can’t yet) re-invent the wheel by becoming MapReduce experts.
In fact, there are now so many products and projects pushing SQL queries and interactive data analysis on Hadoop — including two more announced this week — that it’s getting hard to keep track. But I’ll do my best.
Of course, Facebook began this whole movement to bring SQL database-like functionality to Hadoop when it created Hive in 2009. Hive, now an Apache project, includes a data-management layer and SQL-like query language called HiveQL. It has proven rather useful and popular over the years, but Hive’s reliance on MapReduce makes it somewhat slow by nature — MapReduce scans the entire data set and moves a lot of data over the network while processing a job — and there hasn’t been much effort to package it in a manner that might attract mainstream users.
And keep in mind that this next generation of SQL-on-Hadoop tools aren’t just business intelligence or database products that can access data stored in Hadoop; EMC Greenplum, HP Vertica, IBM Netezza, ParAccel, Microsoft SQL Server and Teradata/Aster Data (which this week released some cool new features for just this purpose) all allow some sort of access to Hadoop data. Rather, these are applications, frameworks and engines that let users query Hadoop data from inside Hadoop, sometimes by re-architecting the underlying compute and data infrastructures. The beauty of this approach is that data is usable in its existing form and, in theory, doesn’t require two separate data stores for analytic applications.

Data warehouses and BI: The Structure: Data set

Structure:Data: Put data to work. 60+ big data experts speaking. March 20-21, 2013, New York City. Register now.I’m highlighting this group of companies first, not because I think they’re the best (although that might well be), but because I’m truly excited about the panel they’ll be featured on at our conference next month. The panel is moderated by Facebook engineering manager Ravi Murthy– a guy who knows his way around a database — so they’ll have to answer some tough questions from one of the most-advanced and most-aggressive Hadoop and analytics tools users out there:
Apache Drill: Drill is a MapR-led effort to create a Google Dremel-like (or BigQuery-like) interactive query engine on top of Hadoop. First announced in August, the project is still under development and in the incubator program within Apache. According to its web site, “One explicitly stated design goal is that Drill is able to scale to 10,000 servers or more and to be able to process petabyes of data and trillions of records in seconds.”
Hadapt: Hadapt, which actually launched at Structure: Data in 2011, was the first of the SQL on Hadoop vendors and is somewhat unique in that it has a real product on the market and real users in production. Its unique architecture includes tools for advanced SQL functions and a split-execution engine for MapReduce and relational tasks, and both HDFS and relational storage. In October, the company announced a tight integration with Tableau Software around advanced visual analytics.
HAD_Graphic2-scaled
platforaarchPlatfora: Technically not a SQL product, Platfora is red-hot right now and is trying to re-imagine the world of business intelligence for a big data world. Essentially an HTML5 canvas laid atop Hadoop and an in-memory, massively parallel processing engine, the company’s software, which it unveiled in October, is designed to make analyzing data stored in Hadoop a fast and visually intuitive process.
Qubole: Qubole is an interesting case in that it’s essentially a cloud-based version of the popular Apache Hive framework launched by the guys who created Hive while working at Facebook. Qubole claims it auto-scaling abilities, optimized Hadoop code and columnar data cache make its service run much faster than Hive alone — and running on Amazon Web Services makes it easier than maintaining a physical cluster.
cache

Data warehouses and BI: The rest

Citus Data: Citus Data’s CitusDB isn’t just about Hadoop, but rather wants to bring the power of its distributed Postgres implementation to all types of data. It relies on Postgres’s foreign data wrappers feature to convert disparate data types into the database’s native format, and then on its own distributed-processing technology to carry out queries in seconds or less. Because of its Postgres foundation, CitusDB can join data from different data sources and retains all the native features that come with that database.
citus_hadoop_architecture
Cloudera ImpalaCloudera’s Impala might just be the most-important SQL-on-Hadoop effort around because of Cloudera’s expansive installation and partner footprints. It’s a massively parallel processing engine that bypasses MapReduce to enable interactive queries on data stored in either HDFS or HBase, using the same variant of SQL that Hive uses. However, because Cloudera doesn’t build applications, it’s relying on higher-level BI and analytics partners to provide the user interface.
impala
Karmasphere: Karmasphere is one of the first startups to build an analytic application atop Hadoop, and in its 2.0 release last year the company added support for SQL queries of data in HDFS. Like Hive, Karmasphere still relies on MapReduce to process queries, which means it’s inherently slower than newer approaches. However, unlike Hive, Karmasphere allows for parallel queries to run at the same time and includes a visual interface for writing queries and filtering results.
multiple-large
Lingual: Lingual is a new open source project from Concurrent (see disclosure), the parent company of the Cascading framework for Hadoop. Announced on Wednesday, Lingual runs on Cascading and gives developers and analysts a true ANSI SQL interface from which to run analytics or build applications. Lingual is compatible with traditional BI tools, JDBC  and the Cascading family of APIs.
Phoenix: Phoenix is a new and relatively unknown open source project that comes out of Salesforce.com and aims to allow fast SQL queries of data stored in HBase, the NoSQL database built atop HDFS. Its stated mission: “Become the standard means of accessing HBase data through a well-defined, industry standard API.” Users interact with it through JDBC interfaces, and its developers claim its sub-second response times for small queries and seconds-long response for querying tens of millions of rows.
A sample of Phoenix via the SQuirreL client
A sample of Phoenix via the SQuirreL client
sharkShark: Shark isn’t technically Hadoop, but it’s cut from the same cloth. Shark, in this case, stands for “Hive on Spark,” with Hive meaning the same thing it does to Hadoop, but with Spark being an in-memory platform designed to run parallel-processing jobs 100 times faster than MapReduce (a speed improve over traditional Hive that Shark also claims). Shark also includes APIs for turning query results into a type of data format amenable to machine learning algorithms. Both Shark and Spark are developed by the University of California, Berkeley’s AMPLab.

Screen-Shot-2013-02-19-at-5.37.01-PM-300x235Stinger Initiative: Launched on Wednesday (along with a security gateway called Knox and a faster, simpler processing framework called Tez), the Stinger Initiative is a Hortonworks-led effort to make Hive faster — up too 100x — and more functional. Stinger adds more SQL analytics capabilities to Hive, but the most-important aspects are infrastructural: an optimized execution engine, a columnar file format and the ability to avoid MapReduce bottlenecks by running atop Tez.

Operational SQL

Drawn to Scale: Drawn to Scale is a startup that has built an operational SQL database on top of HBase. The key word here is database, as its product, called Spire, is modeled after Google’s F1 designed to power transactional applications as analytic ones. Spire has a fully distributed index and queries are sent only to the node with the relevant data, so reads and writes are fast and the system can handle lots of concurrent users without falling down.
SpireArchitecture.015
spliceSplice Machine: Database startup Splice Machine is also trying to get into the operational space by building its Splice SQL Engine atop the naturally distributed HBase database. Splice Machine focuses its message on transactional integrity, which is really where it separates itself from scalable NoSQL databases and analytics-focused SQL-on-Hadoop efforts. It relies on HBase’s aut0-sharding feature in order to making scaling an easy process.
Structure:Data: Put data to work. 60+ big data experts speaking. March 20-21, 2013, New York City. Register now.
Feature image courtesy of Shutterstock user hauhu.
Upcoming: Structure:Data, Mar. 20-21, 2013, New York, Register by March 1 and save $200! More upcoming conferences.



Related research and analysis from GigaOM Pro:
Subscriber content. Sign up for a free trial.




Replication and the latency-consistency tradeoff

Replication and the latency-consistency tradeoff

by Daniel Abadi, dbmsmusings.blogspot.com
December 7th 2011

As 24/7 availability becomes increasingly important for modern applications, database systems are frequently replicated in order to stay up and running in the face of database server failure. It is no longer acceptable for an application to wait for a database to recover from a log on disk --- most mission-critical applications need immediate failover to a replica.
There are several important tradeoffs to consider when it comes to system design for replicated database systems. The most famous one is CAP --- you have to trade off consistency vs. availability in the event of a network partition. In this post, I will go into detail about a lesser-known but equally important tradeoff --- between latency and consistency. Unlike CAP, where consistency and availability are only traded off in the event of a network partition, the latency vs. consistency tradeoff is present even during normal operations of the system. (Note: the latency-consistency tradeoff discussed in this post is the same as the "ELC" case in my PACELC post).
The intuition behind the tradeoff is the following: there's no way to perform consistent replication across database replicas without some level of synchronous network communication. This communication takes time and introduces latency. For replicas that are physically close to each other (e.g., on the same switch), this latency is not necessarily onerous. But replication over a WAN will introduce significant latency.
The rest of this post adds more meat to the above intuition. I will discuss several general techniques for performing replication, and show how each technique trades off latency or consistency. I will then discuss several modern implementations of distributed database systems and show how they fit into the general replication techniques that are outlined in this post.
There are only three alternatives for implementing replication (each with several variations): (1) data updates are sent to all replicas at the same time, (2) data updates are sent to an agreed upon master node first, or (3) data updates are sent to a single (arbitrary) node first. Each of these three cases can be implemented in various ways; however each implementation comes with a consistency-latency tradeoff. This is described in detail below.
  1. Data updates are sent to all replicas at the same time. If updates are not first passed through a preprocessing layer or some other agreement protocol, replica divergence (a clear lack of consistency) could ensue (assuming there are multiple updates to the system that are submitted concurrently, e.g., from different clients), since each replica might choose a different order with which to apply the updates . On the other hand, if updates are first passed through a preprocessing layer, or all nodes involved in the write use an agreement protocol to decide on the order of operations, then it is possible to ensure that all replicas will agree on the order in which to process the updates, but this leads to several sources of increased latency. For the case of the agreement protocol, the protocol itself is the additional source of latency. For the case of the preprocessor, the additional sources of latency are:

    1. Routing updates through an additional system component (the preprocessor) increases latency
    2. The preprocessor either consists of multiple machines or a single machine. If it consists of multiple machines, an agreement protocol to decide on operation ordering is still needed across machines. Alternatively, if it runs on a single machine, all updates, no matter where they are initiated (potentially anywhere in the world) are forced to route all the way to the single preprocessor first, even if there is a data replica that is nearer to the update initiation location.

  2. Data updates are sent to an agreed upon location first (this location can be dependent on the actual data being updated) --- we will call this the "master node" for a particular data item. This master node resolves all requests to update the same data item, and the order that it picks to perform these updates will determine the order that all replicas perform the updates. After it resolves updates, it replicates them to all replica locations. There are three options for this replication:

    1. The replication is done synchronously, meaning that the master node waits until all updates have made it to the replica(s) before "committing" the update. This ensures that the replicas remain consistent, but synchronous actions across independent entities (especially if this occurs over a WAN) increases latency due to the requirement to pass messages between these entities, and the fact that latency is limited by the speed of the slowest entity.
    2. The replication is done asynchronously, meaning that the update is treated as if it were completed before it has been replicated. Typically the update has at least made it to stable storage somewhere before the initiator of the update is told that it has completed (in case the master node fails), but there are no guarantees that the update has been propagated to replicas. The consistency-latency tradeoff in this case is dependent on how reads are dealt with:
      1. If all reads are routed to the master node and served from there, then there is no reduction in consistency. However, there are several latency problems with this approach:
        1. Even if there is a replica close to the initiator of the read request, the request must still be routed to the master node which could potentially be physically much farther away.
        2. If the master node is overloaded with other requests or has failed, there is no option to serve the read from a different node. Rather, the request must wait for the master node to become free or recover. In other words, there is a potential for increased latency due to lack of load balancing options.

      2. If reads can be served from any node, read latency is much better, but this can result in inconsistent reads of the same data item, since different locations have different versions of a data item while its updates are still being propagated, and a read can potentially be sent to any of these locations. Although the level of reduced consistency can be bounded by keeping track of update sequence numbers and using them to implement "sequential/timeline consistency" or "read-your-writes consistency", these options are nonetheless reduced consistency options. Furthermore, write latency can be high if the master for a write operation is geographically far away from the requester of the write.

    3. A combination of (a) and (b) are possible. Updates are sent to some subset of replicas synchronously, and the rest asynchronously. The consistency-latency tradeoff in this case again is determined by how reads are dealt with. If reads are routed to at least one node that had been synchronously updated (e.g. when R + W > N in a quorum protocol, where R is the number of nodes involved in a synchronous read, W is the number of nodes involved in a synchronous write, and N is the number of replicas), then consistency can be preserved, but the latency problems of (a), (b)(i)(1), and (b)(i)(2) are all present (though to somewhat lower degrees, since the number of nodes involved in the synchronization is smaller, and there is potentially more than one node that can serve read requests). If it is possible for reads to be served from nodes that have not been synchronously updated (e.g. when R + W <= N), then inconsistent reads are possible, as in (b)(ii) above .

  3. Data updates are sent to an arbitrary location first, the updates are performed there, and are then propagated to the other replicas. The difference between this case and case (2) above is that the location that updates are sent to for a particular data item is not always the same. For example, two different updates for a particular data item can be initiated at two different locations simultaneously. The consistency-latency tradeoff again depends on two options:
    1. If replication is done synchronously, then the latency problems of case (2)(a) above are present. Additionally, extra latency can be incurred in order to detect and resolve cases of simultaneous updates to the same data item initiated at two different locations.
    2. If replication is done asynchronously, then similar consistency problems as described in case (1) and (2b) above present themselves.

Therefore, no matter how the replication is performed, there is a tradeoff between consistency and latency. For carefully controlled replication across short distances, there exists reasonable options (e.g. choice 2(a) above, since network communication latency is small in local data centers); however, for replication over a WAN, there exists no way around the significant consistency-latency tradeoff.
To more fully understand the tradeoff, it is helpful to consider how several well-known distributed systems are placed into the categories outlined above. Dynamo, Riak, and Cassandra choose a combination of (2)(c) and (3) from the replication alternatives described above. In particular, updates generally go to the same node, and are then propagated synchronously to W other nodes (case (2)(c)). Reads are synchronously sent to R nodes with R + W typically being set to a number less than or equal to N, leading to a possibility of inconsistent reads. However, the system does not always send updates to the same node for a particular data item (e.g., this can happen in various failure cases, or due to rerouting by a load balancer), which leads to the situation described in alternative (3) above, and the potentially more substantial types of consistency shortfalls. PNUTS chooses option (2)(b)(ii) above, for excellent latency at reduced consistency. HBase chooses (2) (a) within a cluster, but gives up consistency for lower latency for replication across different clusters (using option (2)(b)).
In conclusion, there are two major reasons to reduce consistency in modern distributed database systems, and only one of them is CAP. Ignoring the consistency-latency tradeoff of replicated systems is a great oversight, since it is present at all times during system operation, whereas CAP is only relevant in the (arguably) rare case of a network partition. In fact, the consistency-latency tradeoff is potentially more significant than CAP, since it has a more direct effect of the baseline operations of modern distributed database systems.

Original Page: http://dbmsmusings.blogspot.com/2011/12/replication-and-latency-consistency.html