Uploaded image for project: 'HBase'
  1. HBase
  2. HBASE-2357

Coprocessors: Add read-only region replicas (slaves) for availability and fast region recovery

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • None
    • None
    • master, regionserver
    • None

    Description

      I dont plan on working on this in the short term, but the idea is to extend region ownership to have two modes. Each region has one primary region server and N slave region servers. The slaves would follow the master (probably by streaming the relevant HLog entries directly from it) and be able to serve stale reads. The benefit is twofold: (a) provides the ability to spread read load, (b) enables very fast region failover/rebalance since the memstore is already nearly up to date on the slave RS.

      Attachments

        Issue Links

          Activity

            Stream edits with no freshness guarantee or use ZAB or Paxos over small (N=3) cliques? The latter can do away with the WAL as an option or the leader can maintain the WAL as part of the write transaction. This would still allow (a) and (b) but strengthen the consistency of both. It's not clear if there would be a significant write penalty beyond what we already take with durable WAL (hflush), especially if the WAL is only used if all members of a clique fail, so the consensus protocol and hflush can happen in parallel. Crazy idea?

            apurtell Andrew Kyle Purtell added a comment - Stream edits with no freshness guarantee or use ZAB or Paxos over small (N=3) cliques? The latter can do away with the WAL as an option or the leader can maintain the WAL as part of the write transaction. This would still allow (a) and (b) but strengthen the consistency of both. It's not clear if there would be a significant write penalty beyond what we already take with durable WAL (hflush), especially if the WAL is only used if all members of a clique fail, so the consensus protocol and hflush can happen in parallel. Crazy idea?
            ryanobjc ryan rawson added a comment -

            that is so crazy it just might work!

            i wonder how slow updates might get?

            ryanobjc ryan rawson added a comment - that is so crazy it just might work! i wonder how slow updates might get?
            tlipcon Todd Lipcon added a comment -

            well, this whole thing is a crazy idea, I don't anticipate working on it until a lot of other much more important things are done

            As for your specific brand of crazy idea, I think log shipping is a well proven and simple method that should really cover the majority of use cases. Consensus is tricky to get right, and while using an underlying well tested protocol like ZAB helps, it still is nowhere near easy. It also means that writes on one node are blocked by slaves. So I'm -1 on that, but only as much as one can be -1 on a crazy idea while proposing another crazy idea

            tlipcon Todd Lipcon added a comment - well, this whole thing is a crazy idea, I don't anticipate working on it until a lot of other much more important things are done As for your specific brand of crazy idea, I think log shipping is a well proven and simple method that should really cover the majority of use cases. Consensus is tricky to get right, and while using an underlying well tested protocol like ZAB helps, it still is nowhere near easy. It also means that writes on one node are blocked by slaves. So I'm -1 on that, but only as much as one can be -1 on a crazy idea while proposing another crazy idea

            Writes would be blocked by the slowest of the clique but if this scheme is allowing (strongly consistent!) read load to be more spread out, then in theory anyway the probability of hot accesses to a particular region server starving the write side is lowered accordingly. We could mock it and see what happens and/or try to work through some of the particulars formally. Like Ryan I wonder how slow updates might get. Consider if we run ZAB on a 3-node clique and hflush in parallel to commit with a barrier on completion of both. Who wins the race? How often would hflush take longer? Could be a substantial percentage, especially in a mixed HBase and HDFS (plain mapreduce or Hive or Pig or Cascading or...) loaded environment. It's not clear that hflush would not dominate, is my point.

            What I don't like about log shipping is the read replicas are not going to be useful to someone who is using HBase for its strong consistency and needs it, with exception for use cases where one could accept consistent results looking back from the timestamp of the last replication. (But that timestamp could be different on each slave, so master and slaves might all have different views!) But with a consensus protocol, read load can be spread as is the intent of this issue and yet the data is still strongly consistent.

            So I might humbly suggest that both ideas have pros and cons and neither warrants a -1 nor a +1 at this point, IMO.

            apurtell Andrew Kyle Purtell added a comment - Writes would be blocked by the slowest of the clique but if this scheme is allowing (strongly consistent!) read load to be more spread out, then in theory anyway the probability of hot accesses to a particular region server starving the write side is lowered accordingly. We could mock it and see what happens and/or try to work through some of the particulars formally. Like Ryan I wonder how slow updates might get. Consider if we run ZAB on a 3-node clique and hflush in parallel to commit with a barrier on completion of both. Who wins the race? How often would hflush take longer? Could be a substantial percentage, especially in a mixed HBase and HDFS (plain mapreduce or Hive or Pig or Cascading or...) loaded environment. It's not clear that hflush would not dominate, is my point. What I don't like about log shipping is the read replicas are not going to be useful to someone who is using HBase for its strong consistency and needs it, with exception for use cases where one could accept consistent results looking back from the timestamp of the last replication. (But that timestamp could be different on each slave, so master and slaves might all have different views!) But with a consensus protocol, read load can be spread as is the intent of this issue and yet the data is still strongly consistent. So I might humbly suggest that both ideas have pros and cons and neither warrants a -1 nor a +1 at this point, IMO.

            Minor clarification

            But with a consensus protocol, read load can be spread as is the intent of this issue and yet the data is still strongly consistent

            at any time on any region server hosting the region (or a replica)

            so an ICV is atomic no matter what region server any client is talking to, for example.

            apurtell Andrew Kyle Purtell added a comment - Minor clarification But with a consensus protocol, read load can be spread as is the intent of this issue and yet the data is still strongly consistent at any time on any region server hosting the region (or a replica) so an ICV is atomic no matter what region server any client is talking to, for example.

            We are going to attempt this with coprocessors.

            apurtell Andrew Kyle Purtell added a comment - We are going to attempt this with coprocessors.
            tlipcon Todd Lipcon added a comment -

            cool! looking forward to seeing it!

            tlipcon Todd Lipcon added a comment - cool! looking forward to seeing it!
            larsgeorge Lars George added a comment -

            Ah this is nice! I had asked this many times and insinuated something like that to avoid that dreaded "region is a goner for a while until redeployed" in high availability environments. Using a consensus brings us into the realm of using a Dynamo like RegionServer architecture. With all the pros and cons, the latter being if a strict consistency is asked for then you pay a performance penalty. That is the case with any other open source projects implementing "R+W>N". Can't we employ ZooKeeper for this somehow?

            I love it!

            larsgeorge Lars George added a comment - Ah this is nice! I had asked this many times and insinuated something like that to avoid that dreaded "region is a goner for a while until redeployed" in high availability environments. Using a consensus brings us into the realm of using a Dynamo like RegionServer architecture. With all the pros and cons, the latter being if a strict consistency is asked for then you pay a performance penalty. That is the case with any other open source projects implementing "R+W>N". Can't we employ ZooKeeper for this somehow? I love it!

            I just committed to doing this (eventually) up on Quora so I guess I better own it.

            apurtell Andrew Kyle Purtell added a comment - I just committed to doing this (eventually) up on Quora so I guess I better own it.

            Looks like someone may have extracted ZAB some time around the 3.1.0 timeframe: https://svn.cs.hmc.edu/svn/linkedin08/zab-multibranch/

            apurtell Andrew Kyle Purtell added a comment - Looks like someone may have extracted ZAB some time around the 3.1.0 timeframe: https://svn.cs.hmc.edu/svn/linkedin08/zab-multibranch/

            @Andrew Can Zookeeper be used (as is) to elect a master (eg, why is ZAB necessary)? Is there a solidified design for this issue? I think simply using the MySQL replication paradigm is sufficient for the first implementation?

            jasonrutherglen Jason Rutherglen added a comment - @Andrew Can Zookeeper be used (as is) to elect a master (eg, why is ZAB necessary)? Is there a solidified design for this issue? I think simply using the MySQL replication paradigm is sufficient for the first implementation?

            @Jason No, no design doc yet. I mean to do one when I can get a suitable block of time for this.

            ZAB is not necessary for basic read replicas that sync "eventually", basic MySQL-like master-slave. That would be the first step of course since most would only need that. Initial thoughts on this is a region slave can get notice from the region owner via zk that a log has rolled and process the new edits from there. Slaves will be under different memory pressure for their mix of regions than the owner, is the only significant detail to work through I think. So for this possibly shadow/temporary flush file storage for slaves that are managing shadow memstores, while sharing the permanent store files with the owner. Also need some zk-mediated coordination around splitting and compaction. Preferably the owner can do splits and compactions leaving the shared store files alone to the last possible moment, then do a change notification via zk and a HDFS rename. And, when all slaves have stopped sharing old storefiles, then garbage collection.

            ZAB would be for a next step, getting cliques to all see and agree upon edits coming in, in effect master-master-master replication. This is blue sky stuff. Regions would have higher availability than single region server hosting, yet all clients would have a consistent view of the data contained therein at any moment. However a region would need be deployed to 2N+1 regionservers, where N is the number of expected concurrent node failures, or it would not be writable as long as lacking quorum.

            apurtell Andrew Kyle Purtell added a comment - @Jason No, no design doc yet. I mean to do one when I can get a suitable block of time for this. ZAB is not necessary for basic read replicas that sync "eventually", basic MySQL-like master-slave. That would be the first step of course since most would only need that. Initial thoughts on this is a region slave can get notice from the region owner via zk that a log has rolled and process the new edits from there. Slaves will be under different memory pressure for their mix of regions than the owner, is the only significant detail to work through I think. So for this possibly shadow/temporary flush file storage for slaves that are managing shadow memstores, while sharing the permanent store files with the owner. Also need some zk-mediated coordination around splitting and compaction. Preferably the owner can do splits and compactions leaving the shared store files alone to the last possible moment, then do a change notification via zk and a HDFS rename. And, when all slaves have stopped sharing old storefiles, then garbage collection. ZAB would be for a next step, getting cliques to all see and agree upon edits coming in, in effect master-master-master replication. This is blue sky stuff. Regions would have higher availability than single region server hosting, yet all clients would have a consistent view of the data contained therein at any moment. However a region would need be deployed to 2N+1 regionservers, where N is the number of expected concurrent node failures, or it would not be writable as long as lacking quorum.

            @Andrew The ZAB would be very cool, as then there wouldn't be a need for too much logic when a master fails? However I wonder about the write performance, as it means additional network overhead (to each node) per write?

            Initial thoughts on this is a region slave can get notice from the region owner via zk that a log has rolled and process the new edits from there

            What is the expected latency between a write and then reading the new value(s) from the slave? I'm not sure if this means writing a series of WAL edits to a file, then waiting for the file to reach a given limit , and then the slave reads from the newly flushed log in HDFS? If this is the case, perhaps we'll want to implement replication that is more immediate (like MySQL)?

            jasonrutherglen Jason Rutherglen added a comment - @Andrew The ZAB would be very cool, as then there wouldn't be a need for too much logic when a master fails? However I wonder about the write performance, as it means additional network overhead (to each node) per write? Initial thoughts on this is a region slave can get notice from the region owner via zk that a log has rolled and process the new edits from there What is the expected latency between a write and then reading the new value(s) from the slave? I'm not sure if this means writing a series of WAL edits to a file, then waiting for the file to reach a given limit , and then the slave reads from the newly flushed log in HDFS? If this is the case, perhaps we'll want to implement replication that is more immediate (like MySQL)?

            Another way to implement this functionality is for the slave(s) to loop on the HLog.Reader? Are there any potential problems with that?

            I'm not sure how the Coprocessor implementation would look, would the master push entries out? Isn't that somewhat problematic, eg, when a slave goes down, an entry isn't sent or is skipped?

            jasonrutherglen Jason Rutherglen added a comment - Another way to implement this functionality is for the slave(s) to loop on the HLog.Reader? Are there any potential problems with that? I'm not sure how the Coprocessor implementation would look, would the master push entries out? Isn't that somewhat problematic, eg, when a slave goes down, an entry isn't sent or is skipped?

            Another way to implement this functionality is for the slave(s) to loop on the HLog.Reader?

            Yes.

            Are there any potential problems with that?

            Like with my above "first cut" proposal to scan HLogs upon roll, it would miss anything not .writeToWAL(true).

            I'm not sure how the Coprocessor implementation would look, would the master push entries out?

            Yes, it would either stream updates out of all hooks for mutations or run a consensus protocol in parallel with WAL commit out of the same.

            Isn't that somewhat problematic, eg, when a slave goes down, an entry isn't sent or is skipped?

            With simple streaming, when a slave goes down its replica becomes invalid and should be simply discarded. So then I suppose there will be a period of time after that happens, when a new slave is allocated and is behind until the master sends over all the memstore. With ZAB, a transaction log and updates from peers is part of the protocol.

            apurtell Andrew Kyle Purtell added a comment - Another way to implement this functionality is for the slave(s) to loop on the HLog.Reader? Yes. Are there any potential problems with that? Like with my above "first cut" proposal to scan HLogs upon roll, it would miss anything not .writeToWAL(true). I'm not sure how the Coprocessor implementation would look, would the master push entries out? Yes, it would either stream updates out of all hooks for mutations or run a consensus protocol in parallel with WAL commit out of the same. Isn't that somewhat problematic, eg, when a slave goes down, an entry isn't sent or is skipped? With simple streaming, when a slave goes down its replica becomes invalid and should be simply discarded. So then I suppose there will be a period of time after that happens, when a new slave is allocated and is behind until the master sends over all the memstore. With ZAB, a transaction log and updates from peers is part of the protocol.

            Andrew, looping on the HLog sounds good. I guess the next thing to conclude is how replication is defined in Zookeeper? Should we implement something similar to HBASE-1295 or change that system to accommodate master -> slave(s)?

            jasonrutherglen Jason Rutherglen added a comment - Andrew, looping on the HLog sounds good. I guess the next thing to conclude is how replication is defined in Zookeeper? Should we implement something similar to HBASE-1295 or change that system to accommodate master -> slave(s)?

            Andrew, looping on the HLog sounds good.

            -1

            Directly accessing HFiles from a coprocessor should be discouraged, this is something I've seen have general agreement in discussions where it comes up. We created the WALObserver interface for exactly the purpose of capturing (and/or altering) the stream of edits going to the WAL.

            apurtell Andrew Kyle Purtell added a comment - Andrew, looping on the HLog sounds good. -1 Directly accessing HFiles from a coprocessor should be discouraged, this is something I've seen have general agreement in discussions where it comes up. We created the WALObserver interface for exactly the purpose of capturing (and/or altering) the stream of edits going to the WAL.

            I do have mixed feelings. Slaves would need to access foreign store files for regions that are not open on the RS. So then tailing HLogs, more foreign files, at the slave is not unreasonable. But that is a major violation of assumptions that store files are private. Sharing store files will require a coordination dance between master and slaves upon compaction and flushes. Sharing active HLogs is more evil given the master may become involved.

            Also, the trouble with watching the WAL either on the slave side at the file or on the master side with WALObserver is that .writeToWAL(false) edits will be unnoticed until flush. I'd like to reevaluation if this limitation could be acceptable. Your thoughts? A solution is for the master to stream edits to slaves from Put, Delete, etc. post hooks via synchronous replication (or ZAB transaction). Could also be via asynchronously drained replication queues that don't block the current client operation unless full, but one should worry about increasing heap pressure.

            apurtell Andrew Kyle Purtell added a comment - I do have mixed feelings. Slaves would need to access foreign store files for regions that are not open on the RS. So then tailing HLogs, more foreign files, at the slave is not unreasonable. But that is a major violation of assumptions that store files are private. Sharing store files will require a coordination dance between master and slaves upon compaction and flushes. Sharing active HLogs is more evil given the master may become involved. Also, the trouble with watching the WAL either on the slave side at the file or on the master side with WALObserver is that .writeToWAL(false) edits will be unnoticed until flush. I'd like to reevaluation if this limitation could be acceptable. Your thoughts? A solution is for the master to stream edits to slaves from Put, Delete, etc. post hooks via synchronous replication (or ZAB transaction). Could also be via asynchronously drained replication queues that don't block the current client operation unless full, but one should worry about increasing heap pressure.

            A solution is for the master to stream edits to slaves from Put, Delete, etc. post hooks via synchronous replication (or ZAB transaction). Could also be via asynchronously drained replication queues that don't block the current client operation unless full, but one should worry about increasing heap pressure

            Maybe we should call this 'push' based Coprocessor replication. A queue would probably be necessary, as if a slave server goes down, we'd want to mitigate the errant network calls. Would the push be multi-threaded?

            I think the MySQL approach is the slave(s) connect to the master, then read the transaction log starting from a given sequence id. The Coprocessor doesn't enable that?

            jasonrutherglen Jason Rutherglen added a comment - A solution is for the master to stream edits to slaves from Put, Delete, etc. post hooks via synchronous replication (or ZAB transaction). Could also be via asynchronously drained replication queues that don't block the current client operation unless full, but one should worry about increasing heap pressure Maybe we should call this 'push' based Coprocessor replication. A queue would probably be necessary, as if a slave server goes down, we'd want to mitigate the errant network calls. Would the push be multi-threaded? I think the MySQL approach is the slave(s) connect to the master, then read the transaction log starting from a given sequence id. The Coprocessor doesn't enable that?

            I think the MySQL approach is the slave(s) connect to the master, then read the transaction log starting from a given sequence id. The Coprocessor doesn't enable that?

            Coprocessors can register arbitrary RPC endpoints, so yes slaves can contact the CP on the master to drain their respective queues in a pull model.

            apurtell Andrew Kyle Purtell added a comment - I think the MySQL approach is the slave(s) connect to the master, then read the transaction log starting from a given sequence id. The Coprocessor doesn't enable that? Coprocessors can register arbitrary RPC endpoints, so yes slaves can contact the CP on the master to drain their respective queues in a pull model.

            @Andrew What happens if the queue is drained and the client is well behind? I think it then falls into general recovery?

            jasonrutherglen Jason Rutherglen added a comment - @Andrew What happens if the queue is drained and the client is well behind? I think it then falls into general recovery?

            I'd agree. If a slave finds a replica too far behind or desynchronized due to error the replica should be torn down.

            General recovery is then replacing a failed replica on a new slave elsewhere.

            apurtell Andrew Kyle Purtell added a comment - I'd agree. If a slave finds a replica too far behind or desynchronized due to error the replica should be torn down. General recovery is then replacing a failed replica on a new slave elsewhere.

            Sounds like the basic design is there are N slaves that connect to one master using a socket based protocol. There will be a socket connection open per-region per slave. The Coprocessor will place edits into a per region queue, and a separate thread will write the edits onto the slave socket connections.

            How will this look in Zookeeper? Or should it function in the HMaster?

            jasonrutherglen Jason Rutherglen added a comment - Sounds like the basic design is there are N slaves that connect to one master using a socket based protocol. There will be a socket connection open per-region per slave. The Coprocessor will place edits into a per region queue, and a separate thread will write the edits onto the slave socket connections. How will this look in Zookeeper? Or should it function in the HMaster?

            CP Endpoints operate over HBase RPC.

            Sounds like the basic design is there are N slaves that connect to one master using a socket based protocol. There will be a socket connection open per-region per slave.

            That is not how HBase RPC works. One connection between the endpoints (in this case regionserver and regionserver) is established upon demand, reaped when idle too long, and multiplexed over in the meantime.

            The Coprocessor will place edits into a per region queue

            And spill the queues under heap pressure presumably. Or give up on a too laggy slave and have it killed to avoid blowing out heap in the alternative.

            Or should it function in the HMaster?

            No we need to think of the HMaster as always on the verge of going away to be supplanted by ZooKeeper mediated distributed actions.

            apurtell Andrew Kyle Purtell added a comment - CP Endpoints operate over HBase RPC. Sounds like the basic design is there are N slaves that connect to one master using a socket based protocol. There will be a socket connection open per-region per slave. That is not how HBase RPC works. One connection between the endpoints (in this case regionserver and regionserver) is established upon demand, reaped when idle too long, and multiplexed over in the meantime. The Coprocessor will place edits into a per region queue And spill the queues under heap pressure presumably. Or give up on a too laggy slave and have it killed to avoid blowing out heap in the alternative. Or should it function in the HMaster? No we need to think of the HMaster as always on the verge of going away to be supplanted by ZooKeeper mediated distributed actions.

            That is not how HBase RPC works. One connection between the endpoints (in this case regionserver and regionserver) is established upon demand, reaped when idle too long, and multiplexed over in the meantime.

            Ok, great. The replication master will need to examine ZK, and find out which slaves to RPC connect to.

            And spill the queues under heap pressure presumably. Or give up on a too laggy slave and have it killed to avoid blowing out heap in the alternative

            Spilling would probably add too much complexity (eg, where would it spill to?). I think we need to define how a slave gets too far behind, and then assume it'll need to refresh itself when it does (on a per-region basis, or does the entire RS need to recover?).

            No we need to think of the HMaster as always on the verge of going away to be supplanted by ZooKeeper mediated distributed actions.

            Ok, good to know. Before implementing we should hammer this part of the design out.

            jasonrutherglen Jason Rutherglen added a comment - That is not how HBase RPC works. One connection between the endpoints (in this case regionserver and regionserver) is established upon demand, reaped when idle too long, and multiplexed over in the meantime. Ok, great. The replication master will need to examine ZK, and find out which slaves to RPC connect to. And spill the queues under heap pressure presumably. Or give up on a too laggy slave and have it killed to avoid blowing out heap in the alternative Spilling would probably add too much complexity (eg, where would it spill to?). I think we need to define how a slave gets too far behind, and then assume it'll need to refresh itself when it does (on a per-region basis, or does the entire RS need to recover?). No we need to think of the HMaster as always on the verge of going away to be supplanted by ZooKeeper mediated distributed actions. Ok, good to know. Before implementing we should hammer this part of the design out.

            In a discussion about read replicas, I don't think the push model works, because it's difficult for the master to determine where a slave is at in downloading a stream of events. Instead the slaves can read off the queue (per region)? A slave is behind when it's sequence ID is behind the last item in the queue?

            I think what's nice is HBase seems to have built in conflict resolution. However on the slave will a Put use a local timestamp or the one on the master?

            jasonrutherglen Jason Rutherglen added a comment - In a discussion about read replicas, I don't think the push model works, because it's difficult for the master to determine where a slave is at in downloading a stream of events. Instead the slaves can read off the queue (per region)? A slave is behind when it's sequence ID is behind the last item in the queue? I think what's nice is HBase seems to have built in conflict resolution. However on the slave will a Put use a local timestamp or the one on the master?

            This has been effectively superseded by HBASE-10070

            apurtell Andrew Kyle Purtell added a comment - This has been effectively superseded by HBASE-10070

            People

              Unassigned Unassigned
              tlipcon Todd Lipcon
              Votes:
              3 Vote for this issue
              Watchers:
              24 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: