The Problem
We are running a Cluster of 4 MongoDB servers. They are all joined into one replicaset, called "rs1", which holds a lot of user data. All of the data is organized in some UserID oriented structure.
The load of these servers is getting higher and higher, most of which is caused by the write I/O, generated by thousands of updates per second. Additionally the disks are filling up. The project manager is complaining almost daily that some page loads take too long because the MongoDB responds slowly to the queries from PHP and the monitoring confirms his complaints.
----------------------------------------
| RS1 |
| |
| --------------- --------------- |
| | Data node 1 | | Data node 3 | |
| --------------- --------------- |
| |
| --------------- --------------- |
| | Data node 2 | | Data node 4 | |
| --------------- --------------- |
| |
----------------------------------------
The Solution
Since most of the load on these servers is caused by write, and not by read queries, adding more servers to the cluster will not really solve the problem, because every node in a replicaset replicates every update. So we decided that the only sensible thing to do is to introduce sharding.
To shard our data would be relatively easy, because all of it's queries and updates are by UserID anyway. So basically we could distribute all the data by UserID % 2 into two replicasets, and then decide inside the client code (PHP), based on the UserID, to which replicaset each query has to go.
Since, in our case, its so simple to implement sharding on the client side (PHP), we decided to not use MongoDBs own sharding feature. It would only be an additional point of failure and add complexity, without giving us an advantage.
----------------------- -----------------------
| RS1 | | RS2 |
| | | |
| --------------- | | --------------- |
| | Data node 1 | | | | Data node 3 | |
| --------------- | | --------------- |
| | | |
| --------------- | | --------------- |
| | Data node 2 | | | | Data node 4 | |
| --------------- | | --------------- |
| | | |
| --------------- | | --------------- |
| | Arbiter 1 | | | | Arbiter 2 | |
| --------------- | | --------------- |
| | | |
----------------------- -----------------------
By implementing this configuration, the amount of updates that goes to each data node is only 50% of the previous configuration, while the amount of reads per node is not going to change. The amount of disk space used is also reduced by almost half (except the oplog).
The arbiter nodes are necessary because every MongoDB replicaset needs to have at least 3 active nodes to perform elections, but they don't hold any data and can basically run on any tiny machine without consuming resources.
Coming up with a Migration procedure
So we thought we have the perfect solution for our problem, but it seems that MongoDB replicasets are not really made for being split into multiple replicasets. We also weren't successful in googling for other people who did a similar things.
Of course it would be possible to mongodump all data, break the replicaset, create two new ones, and then mongorestore half of the data into each replicaset. But these tools take many hours to work on big amounts of data (>200G in our case), and during the time of the migration the whole cluster would need to go offline. We can't afford to have many hours of downtime on this cluster, so we were looking for the migration procedure that involves the least downtime possible, no matter how nice or ugly the procedure itself is.
Coming up with this solution took quite some research time, lots of trial and error in the lab, and the resulting procedure is not exactly simple.
A short explanation of the migration would be as follows. We remove two nodes from the replicaset, change their configs to become a new one, start them as new replicaset, add an arbiter to each of the replicasets to fullfill the requirement of having 3 nodes, change the clientcode to implement sharding, and then remove the unused data from each replicaset.
I'm sharing the exact process of how we did it here in the hope to save somebody else the research. In total it includes around 12 steps, of which I'm going to describe each in detail.
This whole procedure should be done during a time when the cluster is less busy than the maximum peak time. Altough there is -no downtime- (yay!), the resources which are available to handle queries are decreased by 50% during some time of the migration.
A short explanation of the migration would be as follows. We remove two nodes from the replicaset, change their configs to become a new one, start them as new replicaset, add an arbiter to each of the replicasets to fullfill the requirement of having 3 nodes, change the clientcode to implement sharding, and then remove the unused data from each replicaset.
I'm sharing the exact process of how we did it here in the hope to save somebody else the research. In total it includes around 12 steps, of which I'm going to describe each in detail.
This whole procedure should be done during a time when the cluster is less busy than the maximum peak time. Altough there is -no downtime- (yay!), the resources which are available to handle queries are decreased by 50% during some time of the migration.
Preparing a Test Environment
To demonstrate the procedure I'm first creating a test environment of 4 data nodes and join them into a replicaset called "rs1".
mst@mst-gentoo-nb ~/ $ for i in 1 2 3 4; do mongod --dbpath ~/mongodb/instance4 --nojournal --port 27047 --logpath ~/mongodb/instance4/mongo.log --logappend --rest --replSet rs1 --oplogSize 128 --maxConns 20 --fork; done
forked process: 14736
all output going to: /home/mst/mongodb/instance1/mongo.log
child process started successfully, parent exiting
forked process: 14780
all output going to: /home/mst/mongodb/instance2/mongo.log
child process started successfully, parent exiting
forked process: 14824
all output going to: /home/mst/mongodb/instance3/mongo.log
child process started successfully, parent exiting
forked process: 14868
all output going to: /home/mst/mongodb/instance4/mongo.log
child process started successfully, parent exiting
Now I join all of them into the replicaset "rs1". To do that I connect to one of them, create the replicaset config, and do an rs.initiate(conf).
mst@mst-gentoo-nb ~/ $ mongo --port 27017
MongoDB shell version: 2.2.3
connecting to: 127.0.0.1:27017/test
> conf={
... "_id" : "rs1",
... "version" : 1,
... "members" : [
... {
... "_id" : 0,
... "host" : "localhost:27017"
... },
... {
... "_id" : 1,
... "host" : "localhost:27027"
... },
... {
... "_id" : 2,
... "host" : "localhost:27037"
... },
... {
... "_id" : 3,
... "host" : "localhost:27047"
... }
... ]
... }
> rs.initiate(conf)
{
"info" : "Config now saved locally. Should come online in about a minute.",
"ok" : 1
}
To be sure that during the migration procedure no data gets lost, I will add some testdata into the replicaset.
rs1:PRIMARY> use mydb1
switched to db mydb1
rs1:PRIMARY> db.createCollection("mycol1")
{ "ok" : 1 }
rs1:PRIMARY> db.mycol1.insert({"_id":0, "key1":"value1"})
rs1:PRIMARY> db.mycol1.insert({"_id":1, "key2":"value2"})
rs1:PRIMARY> db.mycol1.insert({"_id":2, "key3":"value3"})
rs1:PRIMARY> db.mycol1.insert({"_id":3, "key4":"value4"})
Each migration step in detail
Prepare 2 new arbiter nodes
We will need two arbiter nodes because each replicaset needs to have at least 3 nodes to hold elections. Currently we have only 4 active nodes to split into 2 replicasets. So we will add 1 arbiter to each of the 2 replicasets to reach the total of 3 nodes.
In the test environment I'll simply create a new empty directory for each of them to use as their dbpath.
In the test environment I'll simply create a new empty directory for each of them to use as their dbpath.
mst@mst-gentoo-nb ~/mongodb $ mkdir arb1 arb2
Stop data node 3 and 4
It's best to not do this during the busiest peak times, because the amount of read queries that can still be handled is now drastically reduced.
mst@mst-gentoo-nb ~/mongodb $ cat instance3/mongod.lock instance4/mongod.lock
14824
14868
mst@mst-gentoo-nb ~/mongodb $ kill 14824 14868
Add 1 new arbiter node to the currently running cluster
1 of the 2 new arbiter nodes can now be started and added into the currently running cluster.
mst@mst-gentoo-nb ~/ $ mongod --dbpath ~/mongodb/arb1 --nojournal --port 27057 --logpath ~/mongodb/arb1/mongo.log --logappend --rest --replSet rs1 --oplogSize 128 --maxConns 20 --fork
forked process: 15988
all output going to: /home/mst/mongodb/arb1/mongo.log
child process started successfully, parent exiting
Then I connect to the current primary and add the arbiter.
rs1:PRIMARY> rs.addArb("localhost:27057")
{ "ok" : 1 }
Deploy a version of the client code
To make sure that during the next few steps the nodes 3 and 4 are not being used by the client, in our case PHP, we need to deploy a new version of the code which only uses the node's 1 and 2. For doing that we simply change its configuration to use only the IPs of these 2 nodes, instead of all 4, as replicaset "rs1".
Remove node 3 and 4 from the replicaset configuration
The datanode 3 and 4 are currently still in the config of replicaset "rs1". They will simply not be used because the replicaset detects them to be down. In the future, these 2 nodes will become the datanodes for the new replicaset "rs2", so we can remove them from the config of "rs1" on the primary node.
rs1:PRIMARY> rs.conf()
{
"_id" : "rs1",
"version" : 2,
"members" : [
{
"_id" : 0,
"host" : "localhost:27017"
},
{
"_id" : 1,
"host" : "localhost:27027"
},
{
"_id" : 2,
"host" : "localhost:27037"
},
{
"_id" : 3,
"host" : "localhost:27047"
},
{
"_id" : 4,
"host" : "localhost:27057",
"arbiterOnly" : true
}
]
}
rs1:PRIMARY> rs.remove("localhost:27037")
rs1:PRIMARY> rs.remove("localhost:27047")
Start node 3 and 4 without the --replSet parameter
Next we are going to manually edit the definition of the replicaset inside node 3 and 4. For doing that we want to start them without any replicaset being active, so we just remove the --replSet parameter.
for i in 3 4; do mongod --dbpath ~/mongodb/instance${i} --nojournal --port 270${i}7 --logpath ~/mongodb/instance${i}/mongo.log --logappend --rest --oplogSize 128 --maxConns 20 --fork; done
forked process: 24482
all output going to: /home/mst/mongodb/instance3/mongo.log
child process started successfully, parent exiting
forked process: 24492
all output going to: /home/mst/mongodb/instance4/mongo.log
child process started successfully, parent exiting
Ok, the node 3 and 4 are started. Now we need to store a definition of the new replicaset "rs2" into their local.system.replset. Let's first look at what's there already.
> use local
switched to db local
> db.system.replset.find()
{ "_id" : "rs1", "version" : 2, "members" : [ { "_id" : 0, "host" : "localhost:27017" }, { "_id" : 1, "host" : "localhost:27027" }, { "_id" : 2, "host" : "localhost:27037" }, { "_id" : 3, "host" : "localhost:27047" }, { "_id" : 4, "host" : "localhost:27057", "arbiterOnly" : true } ] }
Obviously thats still the old config, including node 3 and 4. That's because at the moment when we removed them, they were down, so they couldn't replicate this change yet.
Build the config for rs2 and store it into the variable "conf". Note that port 27067 is the port which the second new arbiter will use once it is started.
Build the config for rs2 and store it into the variable "conf". Note that port 27067 is the port which the second new arbiter will use once it is started.
> conf = { "_id" : "rs2", "version" : 2, "members" : [
... { "_id" : 0, "host" : "localhost:27037" },
... { "_id" : 1, "host" : "localhost:27047" },
... { "_id" : 2, "host" : "localhost:27067", "arbiterOnly" : true }
... ]}
{
"_id" : "rs2",
"version" : 2,
"members" : [
{
"_id" : 0,
"host" : "localhost:27037"
},
{
"_id" : 1,
"host" : "localhost:27047"
},
{
"_id" : 2,
"host" : "localhost:27067",
"arbiterOnly" : true
}
]
}
Insert the new config into db.system.replset
> db.system.replset.insert(conf)
Finally remove the old config for the replicaset "rs1"
> db.system.replset.remove({"_id":"rs1"})
These steps have to repeated exactly the same way for each of node 3 and 4.
Activate the replicaset on node 3 and 4
Stop node 3 and 4 again
mst@mst-gentoo-nb ~/mongodb $ cat instance3/mongod.lock instance4/mongod.lock
14824
14868
mst@mst-gentoo-nb ~/mongodb $ kill 14824 14868
Restart them the with --replSet parameter. Make sure that --replSet specifies the name of the new replicaset, which in our case is "rs2". Now they will try to resync the replicaset, but realize that the new arbiter isn't reachable yet.
for i in 3 4; do mongod --dbpath ~/mongodb/instance${i} --nojournal --port 270${i}7 --logpath ~/mongodb/instance${i}/mongo.log --logappend --rest --replSet rs2 --oplogSize 128 --maxConns 20 --fork; done
forked process: 24947
all output going to: /home/mst/mongodb/instance3/mongo.log
child process started successfully, parent exiting
forked process: 25002
all output going to: /home/mst/mongodb/instance4/mongo.log
child process started successfully, parent exiting
Start the second arbiter node
Now the node 3 and 4 are using their new replicaset "rs2", so they are already trying to connect to the second arbiter node. As soon as we start it on an empty dbpath, it should automatically join "rs2" and synchronize the config with node 3 and 4.
mst@mst-gentoo-nb ~/ $ mongod --dbpath ~/mongodb/arb2 --nojournal --port 27067 --logpath ~/mongodb/arb2/mongo.log --logappend --rest --replSet rs2 --oplogSize 128 --maxConns 20 --fork
forked process: 25840
all output going to: /home/mst/mongodb/arb2/mongo.log
child process started successfully, parent exiting
By doing rs.status() on either node 3 or 4 we can see that they have automatically added the new arbiter after a few seconds
Tadaa! We have created a functioning second replicaset, which still contains all of the data that "rs1" contained.
Deploy the final client code
Since we now have 2 running replicasets which both contain all the data, we can deploy the client code that implements sharding. This code simply does a UserID % 2 before every mongo query, and based on the result it decides if it should use "rs1" or "rs2". As soon as the new code is active, only half of the data in each of the replicasets will be used.
To verify that all the data is still there, I connect to "rs1" and query it.
To verify that all the data is still there, I connect to "rs1" and query it.
rs1:PRIMARY> use mydb1
switched to db mydb1
rs1:PRIMARY> db.mycol1.find()
{ "_id" : 0, "key1" : "value1" }
{ "_id" : 1, "key2" : "value2" }
{ "_id" : 2, "key3" : "value3" }
{ "_id" : 3, "key4" : "value4" }
Same on "rs2"
rs2:PRIMARY> use mydb1
switched to db mydb1
rs2:PRIMARY> db.mycol1.find()
{ "_id" : 0, "key1" : "value1" }
{ "_id" : 1, "key2" : "value2" }
{ "_id" : 2, "key3" : "value3" }
{ "_id" : 3, "key4" : "value4" }
Remove the unused half of data from both replicasets
Now we can safely remove the unused half of each replicaset. The implementation of this should be done by the client side. In our case it was really easy to implement, because the PHP was aware of which data has to be on which replicaset, so we could simply go through all UserIDs and remove them from the replicaset where they are not used.
In order to not cause any additional load we did the remove very slowly over around 3 months of time. Each night, when most of the users are sleeping anyway, and the db wasn't so busy, around 50k datasets were removed.
If at some time in the future we run into similar trouble again, with too much load caused by updates, we can even repeat the same procedure. We would have to first double the amount of hardware, grow the two replicasets into the new hardware, and then repeat this procedure on each of the replicasets.
Conclusion
The result of this whole migration was as expected. The load on each of the data nodes has decreased a lot, because the amount of updates has decreased by half. The speed of the updates has also increased, so the new performance numbers make the project manager happy.