Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.0-alpha-1, 1.7.0, 2.4.1, 1.8.0
-
None
-
None
-
Reviewed
-
Description
Sometimes, we want to perform MR job which is source cluster and destination cluster is different like following for data migration, batch job and so on.
Configuration conf = HBaseConfiguration.createClusterConf(HBaseConfiguration.create(), sourceClusterKey); final Job job = Job.getInstance(conf, jobName); // ... FileOutputFormat.setOutputPath(job, new Path(outputPath)); Scan scan = createScanner(); TableMapReduceUtil.initTableMapperJob( sourceTableName, scan, Mapper.class, ImmutableBytesWritable.class, Put.class, job); try (Connection con = ConnectionFactory.createConnection(destinationClusterKey); Table table = con.getTable(destinationTableName); RegionLocator regionLocator = con.getRegionLocator(destinationTableName)) { HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); } return job.waitForCompletion(true) ? 0 : 1;
HFileOutputFormat2 doesn't create locality-sensitive hfiles.
We got following exception
2021-02-24 19:55:48,298 WARN [main] org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2: there's something wrong when locating rowkey: xxxxxxxxxxxx
org.apache.hadoop.hbase.TableNotFoundException: Table 'table' was not found, got: XXXX.
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1302)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1181)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1165)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1122)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getRegionLocation(ConnectionManager.java:957)
at org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:74)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:216)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:78)
at org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:43)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Because it creates connection using task configuration which is configured for source cluster.
Thus, it tried to connect to the source cluster and get locations for the table that should exist in the destination.
InetSocketAddress[] favoredNodes = null; if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { HRegionLocation loc = null; String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { try (Connection connection = ConnectionFactory.createConnection(conf); RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { loc = locator.getRegionLocation(rowKey); } catch (Throwable e) { LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), tableName, e); loc = null; } } if (null == loc) { LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); } else { LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort()); if (initialIsa.isUnresolved()) { LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); } else { LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); favoredNodes = new InetSocketAddress[] { initialIsa }; } } } wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
HFileOutputFormat2 should be aware of destination cluster correctly when source and destination is different for proper location-sensitive HFile generation
Attachments
Issue Links
- links to
There are no Sub-Tasks for this issue.