Status: Open
Resolution: Unresolved
Impala 2.12.0
If a to be compressed Catalog Object doesn't fit into a 2GB buffer, an error is thrown.
/// Compresses a serialized catalog object using LZ4 and stores it back in 'dst'. Stores /// the size of the uncompressed catalog object in the first sizeof(uint32_t) bytes of /// 'dst'. The compression fails if the uncompressed data size exceeds 0x7E000000 bytes. Status CompressCatalogObject(const uint8_t* src, uint32_t size, std::string* dst) WARN_UNUSED_RESULT;
CatalogServer::AddPendingTopicItem() calls CompressCatalogObject()
// Add a catalog update to pending_topic_updates_. extern "C" JNIEXPORT jboolean JNICALL Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env, jclass caller_class, jlong native_catalog_server_ptr, jstring key, jlong version, jbyteArray serialized_object, jboolean deleted) { std::string key_string; { JniUtfCharGuard key_str; if (!JniUtfCharGuard::create(env, key, &key_str).ok()) { return static_cast<jboolean>(false); } key_string.assign(key_str.get()); } JniScopedArrayCritical obj_buf; if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) { return static_cast<jboolean>(false); } reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)-> AddPendingTopicItem(std::move(key_string), version, obj_buf.get(), static_cast<uint32_t>(obj_buf.size()), deleted); return static_cast<jboolean>(true); }
However the JNI call to AddPendingTopicItem discards the return value.
Recently the return value was maintained due to IMPALA-10076:
- if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key, - obj.catalog_version, data, delete)) { + int actualSize = FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, + v1Key, obj.catalog_version, data, delete); + if (actualSize < 0) { LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ", delete=" + delete + ", data_size=" + data.length); + } else if (summary != null && obj.type == HDFS_PARTITION) { + summary.update(true, delete, obj.hdfs_partition.partition_name, + obj.catalog_version, data.length, actualSize); } }
CatalogServiceCatalog::addCatalogObject() now produces an error message but the Catalog update doesn't go through.
if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) { String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key; byte[] data = serializer.serialize(obj); int actualSize = FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key, obj.catalog_version, data, delete); if (actualSize < 0) { LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ", delete=" + delete + ", data_size=" + data.length); } else if (summary != null && obj.type == HDFS_PARTITION) { summary.update(true, delete, obj.hdfs_partition.partition_name, obj.catalog_version, data.length, actualSize); } }
Not sure what the right behavior would be, we could handle the compression issue and try more aggressive compression, or unblock the catalog update.