InfiniSQL™ Overview

InfiniSQL™ Overview

2013-11-12 01:17:28


Preface

This overview describes what InfiniSQL is, what its goals are, how those goals are being implemented, and the work that remains. It complements accompanying Guide and Reference documents.

Part I. Goals and Features

Chapter 1. About and Goals

InfiniSQL is a relational database management system (RDBMS) composed entirely from the ground up. InfiniSQL's goals are:

  • Horizontal Scalability

  • Continuous Availability

  • High Throughput

  • Low Latency

  • High Performance For Complex, Multi-Host Transactions

  • Ubiquity

InfiniSQL has been tested to support over 500,000 complex transactions per second with over 100,000 simultaneous connections. This was on a cluster of only 12 single socket x86-64 servers. Subscribed hardware in this environment was exhausted from this effort--so the true upper limits of capacity are unknown. InfiniSQL's scalability across multiple nodes appears to be limitless!

InfiniSQL is an Open Source project with code hosted on GitHub. Contributions to development are sought--this documentation is meant to help newcomers to the project learn how to get involved. Open Source products tend to be the easiest to start working with from a user's standpoint: there's no teaser version of InfiniSQL. No hoops to jump through--just read the documentation, download and build the source, and ask for help along the way as needed. InfiniSQL is currently in an alpha state--it supports many of the features of a usable released product, but lacks others. It might support your application's needs. Please try it out! InfiniSQL is meant to be used as widely as possible. Once it reaches maturity, there will be no reason to use anything else for operational data store workloads.

InfiniSQL performs complex transaction workloads better than any other distributed database--whether a NewSQL or clustered legacy platform. And it performs simple keystore-type workloads as well as or better than many NoSQL solutions. NoSQL projects came about largely because existing RDBMS systems could not meet performance, scalability and budgetary needs of high growth, traffic-intensive applications. InfiniSQL is designed to perform heavy duty transaction processing workloads while scaling like NoSQL. This means that multiple workloads currently using point solutions can be standardized on a single backend platform. No sharding is necessary with InfiniSQL: it partitions data automatically across available hardware. Connect to any node, and all of the data is accessible. This simplifies development and management of the environment. InfiniSQL is a cost-effective and extreme scale RDBMS that cuts out the need for workarounds and point solutions.

Achieving these goals is unnattainable for legacy RDBMS platforms that were designed decades ago. They weren't designed to scale well, if at all, beyond a single server. This design limitation means that legacy platforms do not make sense as a basis for limitless scalability in transaction processing platforms. It's why InfiniSQL needs to be written from scratch, and not as an enhancement to an existing project.

Usage capabilities of InfiniSQL aren't necessarily limited to SQL, though SQL is currently the way in which data is stored and manipulated. Other types of interfaces, such as for messaging and object caching, can also be implemented to use InfiniSQL's transaction processing capabilities.

From the beginning, InfiniSQL supports ad hoc SQL, multi-statement transactions, stored procedures that make use of pre-compiled SQL queries, and scales gracefully across hosts. Its capabilities will only grow from further development.

Please take a little bit more time to learn about InfiniSQL. Hopefully you'll be moved to participate in this project!

Chapter 2. Topology

InfiniSQL is a distributed database system--it operates on one or more hosts and has processes which perform separate functions. The two types of processess are manager and daemon processes. On the filesystem, they are named <sourcedir>/infinisqlmgr/infinisqlmgr.pl and <installationdir>/sbin/infinisqld, respectively. Together, these processes comprise an InfiniSQL Cluster. The practical limit to the size of this cluster is unknown and assumably very high.

Manager Process

These processes are responsible for interacting with the administrator to start up the daemon processes, configure them, and maintain the topology of the cluster. Multiple processes are necessary to implement a quorum-based protocol for managing the cluster. The manager processes are responsible for knowing the health status of each daemon instance, known as a node. Managers send commands to bring daemons in and out of service, tell them when to interact with clients, and trigger jobs to maintain data integrity as cluster topology changes. Cluster reconfiguration involving dynamically adding and removing capacity, and adding and removing redundant data replicas, will be managed by these processes.

The current capabilities of the InfiniSQL Manager is to launch any number of configured daemon nodes and then exit. Dynamic cluster configuration, health checking, quorum management, and so forth, have not been implemented yet. The framework has been started, and progress will continue.

Daemon Process

Daemons are the processes to which clients connect to manipulate data. Each daemon stores a portion of the database, and has access to data spread across all nodes. There is no need to shard InfiniSQL--all clients have access to the same data, regardless of node to which they connect.

Chapter 3. Data Storage

InfiniSQL currently is an in memory database. This means that all records are stored in system memory, and not written to disk. This provides very high performance--but it also means that InfiniSQL currently lacks the property of Durability. If the power goes out, all data is gone. This limitation is temporary. InfiniSQL design calls for a number of features which will provide durability--while retaining very high performance.

  1. Synchronous database replicas with failover. Every infinisqld node has at least one replica to which all transaction write activity gets replicated for every transaction. In host failure events, the manager process will detect the failure and change the cluster topology to make use of a host's replica. This characteristic has also been referred to as "k-safety," and is conceptually similar to the storage protection method of RAID10.

    There is already code within InfiniSQL to allow configuration of replica hosts, and synchronously write to them. However, this feature is not complete yet.

  2. Managed Redundant Power. This mechanism is unique to InfiniSQL among databases, but is in wide use for storage arrays. In conjunction with synchrounous replication described above, every node is on an uninterruptible power supply (UPS). Each cluster has at least 2 UPS systems managed by the InfiniSQL Manager process. In case of total power failure in the datacenter, UPS systems will stay active for a few minutes, based on their capacity, and the manager process will gracefuly shut down each daemon and write data to disk storage. This will ensure durability--even against power failure or system crash--while still maintaining in memory performance.

    The mechanism described here is identical to how high end storage arrays are able to maintain high performance and data integrity. It is common practice in the most performance sensitive and mission critical enterprise storage environments. InfiniSQL intends to move the power protection from the block storage layer and protect the database application itself.

    This has not been implemented yet.

  3. Memory and disk hybrid. This mechanism makes use of in-memory caching and asynchronous writes to traditional block storage. It makes use of the previous features described, to ensure Durability, and retains as much performance as possible, even though some block device accesses will be necessary. This storage medium will be ideal for workloads which require a rather high amount of storage capacity but do not require the absolute fastest performance.

    The real killer for database performance is synchronous transaction log writes. Even with the fastest underlying storage, this activity is the limiting factor for database write performance. InfiniSQL avoids this limiting factor while still retaining durability--and allowing larger data sets than practical with memory only.

    This feature has not yet been implemented.

Chapter 4. Data Access Methods

SQL is one of many possible ways to manipulate data. InfiniSQL uses the PostgreSQL Frontend/Backend Protocol, Version 3. This means that PostgreSQL drivers for any architecture should be able to communicate with InfiniSQL as if it were a PostgreSQL database.

Note

InfiniSQL and PostgreSQL are separate projects. The only PostgreSQL code used by the InfiniSQL daemon consists of symbol definitions of data types used by the Frontend/Backend Protocol. InfiniSQL also uses a patched version of pgbench for performance testing.

InfiniSQL currently supports a subset of ANSI SQL-92. The most critical elements are supported: INSERT, UPDATE, DELETE, and SELECT with search expressions, and contained within multi-statement transactions. More features will arrive in time. The InfiniSQL architecture will support any SQL language feature.

Object caching and many other NoSQL workloads are easily accomplished with InfiniSQL. Most NoSQL solutions were implemented because of weaknesses in legacy RDBMS systems. InfiniSQL doesn't suffer from those shortcomings. InfiniSQL includes stored procedures that emulate keystore get & set behavior. It should be quite simple to map most NoSQL workloads and wire protocols onto InfiniSQL (but the converse is not true, as NoSQL systems generally can't do transactions).

Messaging is a workload often handled by separate infrastructure. InfiniSQL's high write throughput rate can suit it well as a persistent message queue, obviating the need for distinct infrastructure.

InfiniSQL will evolve into a platform for transaction processing of multiple types of activities, and not just SQL.

Part II. Internals

Chapter 5. Actor Model

InfiniSQL applies the actor model of concurrent programming to the problem of creating a scalable, continuously available transaction processing database. The main code base is implemented in C++ for performance reasons. But C++ does not natively support the actor model: much of the work in creating InfiniSQL has been to facilitate the actor model in C++.

Actors

The actors in InfiniSQL are implemented as POSIX threads. Classic actors are short-lived processes, such as implemented in Erlang. Applications create as many actors as necessary to perform discrete tasks, and then dispose of them. However, since it is relatively expensive in terms of processing to create POSIX threads, InfiniSQL does not follow the classic model in that regard. Instead, InfiniSQL's actors are long living threads. Each actor thread is dedicated to a particular set of functions, and each handles a very large number of concurrent activities. InfiniSQL's actor types are as follows:

  • Topology Manager: this is the first actor for any infinisqld process. It is also the actor which launches all other actors. The infinisqlmgr.py command communicates with the Topology Manager of each node using 0mq and MessagePack.

  • Transaction Agent: this actor type reads requests from clients off of the network, parses the request, coordinates transaction processing activity for each request, and replies with query output back to the client. There can be many Transaction Agents per node for scalability.

  • Engine: this actor accesses and manipulates data. Each Engine corresponds to a memory region which contains a partition of each table and index for each schema in the database. Each memory region has only a single Engine actor. Much of the scalability problem inherent in legacy databases comes from locking data regions with mutexes and semaphores. InfiniSQL avoids this kind of contention because each data region has only a single Engine which accesses it. And each node can have many Engines with corresponding data partitions, to achieve scalability.

  • User & Schema Manager: a single actor of this type contains all data pertaining to schemata and authentication information. Table and index definitions as well as user and domain authentication are managed by this actor.

  • Deadlock Manager: InfiniSQL's transaction algorithm is a variation on two-phase locking. As such, it is subject to deadlocks. The Deadlock Manager resolves deadlocks.

  • Listener: there is a single Listener per host which communicates over TCP/IP with clients, and distributes incoming traffic across Transaction Agents.

  • Outbound Gateway: Outbound Gateway actors forward messages from other actors, such as Transaction Agents, which are destined to actors on remote infinisqld processes, such as Engines. Multiple Outbound Gateways can exist per node for scalability.

  • Inbound Gateway: Each Outbound Gateway has corresponding Inbound Gateways on each node within an InfiniSQL topology. Inbound Gateways receive messages from Outbound Gateways over the network, and forward them to their final destination actors. This is the mechanism by which InfiniSQL performs transactions with records distributed across any number of different nodes. Multiple Inbound Gateways can exist per node for scalability.

For any given activity, such as processing an SQL transaction, multiple actors are involved. Each actor performs its assigned task, including sending messages to other actors. This is in contrast to traditional database design where a single worker process (or thread, depending on the particular implementation) handles all activity for any given operation. Here is a simplified flow through various actors for an example query:

  1. Listener on node2 receives notification that a client-connected socket is ready to read. Forwards message to Transaction Agent4.

  2. node2, Transaction Agent4

    1. reads socket

    2. parses query: UPDATE buyertable SET balance = balance - 50 WHERE buyerid = 1234;

    3. Since buyerid is the primary key of table buyertable, hash of 1234 is calculated to show that the record resides on node5, Engine2

    4. send message to node5, Engine2 by way of Outbound Gateway on node2

  3. node2, Outbound Gateway1 on node2 sends message over network to Inbound Gateway1 on node5

  4. node5, Inbound Gateway1 forwards message to Engine2

  5. node5, Engine2 (for simplicity) finds record in buyertable where buyerid=1234, updates balance, replies back to node2, Transaction Agent4 by way of node5 Outbound Gateway2

  6. node5, Outbound Gateway2 sends message over network to node2, Inbound Gateway2

  7. node2, Inbound Gateway2 sends message to node2, Transaction Agent4

  8. node2, TransactionAgent4 sends COMMIT message to node5, Engine2. Steps 3-7 are repeated.

  9. node2, TransactionAgent4 sends reply to client

Events & Messaging

InfiniSQL actor processing is event-based. An actor receives an event, works on the activity requested, stores the state of the activity, if necessary, and then returns to the event loop to wait for another event. In this way a single actor thread can handle any number of concurrent transaction activities. In the traditional worker model, each worker handles a single transaction at a time and performs all activities associated with the transaction. With actors, each actor only performs a subset of tasks for each transaction, but handles tasks for any number of concurrent transactions.

InfiniSQL messaging is implemented in the Mbox*, Message*, Outbound Gateway, and Inbound Gateway classes. Each actor has a mailbox which is an instance of Mbox. The mailbox is a multi-producer, single-consumer queue (many senders, one receiver). The queue is very simple: it is a straight FIFO. There is no prioritization of messages, no peeking at them for later consumption out of order, or any other features beyond simply putting or grabbing the next message, if available. This simplicity pays dividends in performance.

Producing and consuming messages is synchronized among multiple threads by lock free atomic operations. This is much faster than a mutex-based implementation. Internal messaging is a matter of passing pointers to objects of various types of Messages. A very important dependency is Lockless, Inc.'s memory allocator. This is a replacement for malloc(), free() and realloc(). InfiniSQL can create, send, receive and then destroy millions of messages per second per host. Only Lockless' allocator has been found to support this workload.

Message passing between nodes is handled by Outbound Gateway and Inbound Gateway actors. Messages bound for remote nodes are sent through Outbound Gateway-Inbound Gateway pairs. Sending actors serialize messages bound for remote nodes, then pass them to an Outbound Gateway. The Outbound Gateway compresses these messages using LZ4 and sends them out in batches to each remote node. Inbound Gateway decompresses the batch and sends each serialized message to its destination actor. The destination actor then deserializes and acts upon the message. Each node can have multiple gateways of each type to avoid bottlnecks.

Each actor is uniquely represented by the node upon which it resides and by the actor instance in that particular node. Each node in InfiniSQL is a process, and not a physical (or virtual host). There can be many nodes per host. Each nodeid is unique, and all actors are assigned an actorid unique within that particular node. This address is represented by the Topology::addressStruct structure.

Messages between actors within a node are sent via the MboxProducer::send() method, and received via Mbox::receive(). Each producer object is associated with a single destination Mbox object. A higher level layer for sending messages is provided by the Mboxes class. An instance of this type contains a collection of producers organized in various ways, such as by actor type (Transaction Agent, Engine, etc) and by data partition. Each actor has its own Mboxes object, which it updates upon receipt of messages from the Topology Manager. Mboxes methods allow for sending of messages based on type of destination actor and partition number. For example, Mboxes::toPartition() sends a message to the Engine responsible for a specific partition. If that resides on a remote node, then the message is passed to an Outbound Gateway. Otherwise, it's sent locally via MboxProducer::send().

There are a variety of Message types, but each inherit from the base Message class. Each message contains fields specific to the purpose of the Message, such as MessageSocket to notify a Transaction Agent that one of its clients has sent some data, or for MessageTransaction for communication between Transaction Agents and Engines. Messages are serialized and deserialized for transport between nodes. Sending actors serialize messages bound for remote hosts. Receiving actors deserialize messages from remote nodes.

Most actors' event loop is focussed around consuming messages from Mbox. Exceptions to this are the Listener, which receives socket events via epoll(), Inbound Gateway, which uses poll() for socket events, and Topology Manager, which acts upon a 0mq socket.

Transaction Processing

The transaction processing algorithm is a variation on that of two-phase locking (2PL). MVCC was considered initially, but is not scalable enough for the goals of InfiniSQL.[1] Instead, InfiniSQL uses 2PL which incorporates some additional elements:

  • writing transactions synchronously to multiple replicas

  • records distributed across any number of hosts

  • stages are performed asynchronously based on message passing

Most transaction processing logic is represented in the Transaction and SubTransaction classes. Transaction objects are associated with Transaction Agents, and SubTransactions are tied to Engines. Each transaction may involve records from any number of engines from any nodes. All engine activities on behalf of a transaction are performed by a corresponding SubTransaction. Each transaction, therefore, has a single SubTransaction associated with each Engine that contains records involved in the transaction. Each SubTransaction is uniquely associated with a single transaction.

Each stage of a transaction is handled asynchronously. Each Transaction Agent keeps a map of all associated transactions, keyed by transaction identifier. Each Transaction Agent maintains its own list of transactionids. Every unique transaction is identified by Transaction Agent instance and transactionid. Correspondingly, each engine keeps a SubTransaction map for each associated SubTransaction. Transaction stages are as follows:

  • read from client, SQL parse & plan stage

  • corresponds with the 2PL expanding phase, where records are locked as a result of SQL execution. Proposed changes from INSERT, UPDATE and DELETE are staged to each record in a shadow table.

  • dispatch: this phase occurs after a commit has been issued by the client, and proposed changes to all involved records are sent to 1 or more replicas, to ensure redundancy

  • committing: after dispatch completes, each engine in the primary data replica is sent a message to COMMIT the transaction by writing staged row and index changes, and then unlocking

  • reply: after all engines have committed their records and replied, transaction results are returned to the client

Two-phase locking is subject to deadlocks. The Deadlock Manager actor implements a wait-for graph to resolve deadlocks. This area is under development. So, for the time being, all locked records result in a rolled back transaction, and a "lock not available" error reply to the client. This is only temporary, until lock management is properly implemented.

Continuations & Actor RPC

InfiniSQL makes extensive use of continuations. Transaction Agent activities are broken into several steps which wait for other actors, such as Engines, to complete necessary tasks. Each Transaction Agent could be handling many thousdands of transactions at the same time. Therefore, they cannot block while waiting for other tasks to complete. Instead, continuations are used to allow Transaction Agent functions to pick up exactly where they left off when replies arrive.

In conjunction with messaging and continuations, InfiniSQL implements a form of inter-actor remote procedure calls (RPC). Transaction Agents are clients, and Engines, Deadlock Manager and User Schema Manager are RPC servers. Messages from the Transaction Agent invoke a function on the "server." Function name and parameters are passed as members of the Message. Function execution populates the return message. When received, the Transaction Agent continues the function based on its state.

SQL

SQL processing goes through the following steps:

  1. input read

  2. tokenized and parsed by Larxer object, based on rules in lexer.ll and parser.yy

  3. Larxer formats a Statement object, including abstract syntax trees. Converts table and field names to numeric id's.

  4. If the SQL statement was submitted as an ad-hoc query, then the query is executed. If the statement was submitted as part of a compile command, the statement object is stored for later use by a stored procedure. Stored procedures are invoked as SQL statements supplied by the client, but themselves cannot become compiled statements.



[1] 2PL was chosen in favor of multiversion concurrency control (MVCC) for scalability reasons. The limiting factor for MVCC is transaction identifier generation. For each transaction, MVCC requires a transaction identifier which is guaranteed to be greater than all previous transaction identifiers. There is no way to do this in parallel. This seemingly trivial operation, which at its simplest is a matter of atomically incrementing an integer, slows down drastically under concurrent load. Tested on a dual socket, 6-core per socket 3.3GHz Intel server, this operation can be done roughly 2,000,000 times per second with all cores and hyperthreads incrementing the integer. And that's not even considering network overhead from serving transaction identifiers from multiple nodes. So, to avoid such a limiting factor, 2PL was chosen as the basis for the concurrency control algorithm for InfiniSQL.