Default partitioning
By default, Cluster will partition based on primary key
When adding rows to a table that’s using MySQL Cluster as the storage engine, each row is assigned to a partition where that partition is mastered by a particular data node in the Cluster. The best performance comes when all of the data required to satisfy a transaction is held within a single partition so that it can be satisfied within a single data node rather than being bounced back and forth between multiple nodes where extra latency will be introduced.
By default, Cluster partions the data by hashing the primary key. This is not always optimal.
For example, if we have 2 tables, the first using a single-column primary key (sub_id) and the second using a composite key (sub_id, service_name)…
mysql> describe names;
+--------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+-------------+------+-----+---------+-------+
| sub_id | int(11) | NO | PRI | NULL | |
| name | varchar(30) | YES | | NULL | |
+--------+-------------+------+-----+---------+-------+
mysql> describe services;
+--------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+-------------+------+-----+---------+-------+
| sub_id | int(11) | NO | PRI | 0 | |
| service_name | varchar(30) | NO | PRI | | |
| service_parm | int(11) | YES | | NULL | |
+--------------+-------------+------+-----+---------+-------+
If we then add data to these (initially empty) tables, we can then use the ‘explain’ command to see which partitions (and hence phyical hosts) are used to store the data for this single subscriber…
mysql> insert into names values (1,'Billy');
mysql> insert into services values (1,'VoIP',20),(1,'Video',654),(1,'IM',878),(1,'ssh',666);
mysql> explain partitions select * from names where sub_id=1;
+----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+
| 1 | SIMPLE | names | p3 | const | PRIMARY | PRIMARY | 4 | const | 1 | |
+----+-------------+-------+------------+-------+---------------+---------+---------+-------+------+-------+
mysql> explain partitions select * from services where sub_id=1;
+----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+
| 1 | SIMPLE | services | p0,p1,p2,p3 | ref | PRIMARY | PRIMARY | 4 | const | 10 | |
+----+-------------+----------+-------------+------+---------------+---------+---------+-------+------+-------+
The service records for the same subscriber (sub_id = 1) are split accross 4 diffent partitions (p0, p1, p2 & p3). This means that the query results in messages being passed backwards and forwards between the 4 different data nodes which cnsumes extra CPU time and incurs extra latency.
User-defined partitioning to the rescue
We can override the default behaviour by telling Cluster which fields should be fed into the hash algorithm. For our example, it’s reasonable to expect a transaction to access multiple records for the same subscriber (identified by their sub_id) and so the application will perform best if all of the rows for that sub_id are held in the same partition…
mysql> drop table services;
mysql> create table services (sub_id int, service_name varchar (30), service_parm int, primary key (sub_id, service_name)) engine = ndb
-> partition by key (sub_id);
mysql> insert into services values (1,'VoIP',20),(1,'Video',654),(1,'IM',878),(1,'ssh',666);
mysql> explain partitions select * from services where sub_id=1;
+----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+
| 1 | SIMPLE | services | p3 | ref | PRIMARY | PRIMARY | 4 | const | 10 | |
+----+-------------+----------+------------+------+---------------+---------+---------+-------+------+-------+
Now all of the rows for sub_id=1 from the services table are now held within a single partition (p3) which is the same as that holding the row for the same sub_id in the names table. Note that it wasn’t necessary to drop, recreate and re-provision the services table, the following command would have had the same effect:
mysql> alter table services partition by key (sub_id);
Writing a distribution-aware application using the NDB API
Distribution unaware NDB API application
In our example, the data is nicely partitioned for optimum performance when accessing all of the subscriber’s data – a single data node holding all of their data. However, there is another step to take to get the best out of your NDB-API based application. By default, the NDB API will use the Transaction Coordinator (TC) on a ‘random’ data node to handle the transaction – we could get lucky and the guess is correct but it’s more likely that it will be sent to the wrong data node which with then have to proxy it to the correct data node. The probability of getting it right first time reduces as the number of node groups increases and so can prevent linear scaling.
It’s very simple to modify this behaviour so that the best data node/TC is hit first time, every time. When creating the transaction, the application can include parameters telling the NDB API one of the tables to be accessed and for what key(s). The NDB API will then use that information to identify the best TC to use…
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *namesTable= myDict->getTable("names");
const NdbDictionary::Table *servicesTable= myDict->getTable("services");
NdbRecAttr *myRecAttr;
Ndb::Key_part_ptr dist_key[2];
dist_key[0].ptr = (const void*) &sub_id;
dist_key[0].len = sizeof(sub_id);
dist_key[1].ptr = NULL;
dist_key[1].len = NULL;
if (namesTable == NULL)
APIERROR(myDict->getNdbError());
if (servicesTable == NULL)
APIERROR(myDict->getNdbError());
NdbTransaction *myTransaction= myNdb.startTransaction(namesTable,
dist_key);
if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
NdbOperation *myOperation= myTransaction->getNdbOperation(namesTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->readTuple(NdbOperation::LM_Read);
myOperation->equal("sub_id",sub_id);
myRecAttr= myOperation->getValue("name", NULL);
if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError());
// Perform operations on "services" table as well as part of another operation
// if required; the subscriber's data will be in the same data node
if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());
printf(" %2d %sn",
sub_id, myRecAttr->aRef());
myNdb.closeTransaction(myTransaction);
Note that as the services table has been configured to use the same field (sub_id) for partitioning as the names table, the startTransaction method only needs to know about the namesTable as the TC that the NDB API selects will serve just as well for this subscriber’s data from the services table. The rest of the code can be found in distaware.