Details
-
Bug
-
Status: Resolved
-
Normal
-
Resolution: Fixed
-
None
-
None
-
Normal
Description
While working on CASSANDRA-10236 I discovered that CFS.loadNewSSTables() doesn't work for pre-3.0 sstables - just for version ma sstables.
TBC: Starting C* with 2.0, 2.1 or 2.2 sstables works - but loading new sstables during runtime doesn't.
Issues with CFS.loadNewSSTables() discovered so far:
- MetadataSerializer.deserialize(Descriptor,FileDataInput,EnumSet) returns null for MetadataType.HEADER which results in a NPE later in MetadataSerializer.serialize executing Collections.sort.
- After working around the previous issue, it turns out that it couldn't load the digest file, since Component.DIGEST is a singleton which refers to CRC32, but pre-3.0 sstables use Adler32.
- After working around that one, it fails in StreamingHistogram$StreamingHistogramSerializer.deserialize as maxBinSize==Integer.MAX_VALUE.
As loading legacy sstables works fine during startup, I assume my workarounds are not correct.
For reference, this commit contains a ton of legacy sstables (simple, counter, clustered and clustered+counter) for 2.0, 2.1 and 2.2. I've extended LegacySSTablesTest to read these tables using CFS.loadNewSSTables().
LegacySSTablesTest.txt
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index d2922cc..1be6450 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -18,6 +18,9 @@ package org.apache.cassandra.io.sstable; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; @@ -27,10 +30,15 @@ import java.util.Set; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator; @@ -43,6 +51,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamPlan; @@ -57,6 +66,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes; */ public class LegacySSTableTest { + private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class); + public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root"; public static final String KSNAME = "Keyspace1"; public static final String CFNAME = "Standard1"; @@ -64,6 +75,8 @@ public class LegacySSTableTest public static Set<String> TEST_DATA; public static File LEGACY_SSTABLE_ROOT; + public static final String[] legacyVersions = {"jb", "ka", "la"}; + @BeforeClass public static void defineSchema() throws ConfigurationException { @@ -208,4 +221,65 @@ public class LegacySSTableTest throw e; } } + + @Test + public void testLegacyCqlTables() throws Exception + { + QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}"); + + loadLegacyTables(); + } + + private void loadLegacyTables() throws IOException + { + for (String legacyVersion : legacyVersions) + { + logger.info("Preparing legacy version {}", legacyVersion); + + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion)); + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion)); + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion)); + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion)); + + loadLegacyTable("legacy_%s_simple", legacyVersion); + loadLegacyTable("legacy_%s_simple_counter", legacyVersion); + loadLegacyTable("legacy_%s_clust", legacyVersion); + loadLegacyTable("legacy_%s_clust_counter", legacyVersion); + + } + } + + private void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException + { + String table = String.format(tablePattern, legacyVersion); + + logger.info("Loading legacy table {}", table); + + ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(table); + + for (File cfDir : cfs.getDirectories().getCFDirectories()) + { + copySstables(legacyVersion, table, cfDir); + } + + cfs.loadNewSSTables(); + } + + private static void copySstables(String legacyVersion, String table, File cfDir) throws IOException + { + byte[] buf = new byte[65536]; + + for (File file : new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles()) + { + if (file.isFile()) + { + File target = new File(cfDir, file.getName()); + int rd; + FileInputStream is = new FileInputStream(file); + FileOutputStream os = new FileOutputStream(target); + while ((rd = is.read(buf)) >= 0) + os.write(buf, 0, rd); + } + } + } }
broken-workaround
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index 554c782..e953b1d 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -96,7 +96,7 @@ public class Verifier implements Closeable { validator = null; - if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists()) + if (new File(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())).exists()) { validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor); validator.validate(); diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index 54dd35b..d0405e4 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -34,6 +34,7 @@ public class Component public static final char separator = '-'; final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class); + public enum Type { // the base data for an sstable: the remaining components can be regenerated @@ -79,13 +80,17 @@ public class Component } } + private static final String DIGEST_CRC32_NAME = "Digest.crc32"; + private static final String DIGEST_ADLER32_NAME = "Digest.adler32"; + // singleton components for types that don't need ids public final static Component DATA = new Component(Type.DATA); public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX); public final static Component FILTER = new Component(Type.FILTER); public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO); public final static Component STATS = new Component(Type.STATS); - public final static Component DIGEST = new Component(Type.DIGEST); + public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, DIGEST_CRC32_NAME); + public final static Component DIGEST_ADLER32 = new Component(Type.DIGEST, DIGEST_ADLER32_NAME); public final static Component CRC = new Component(Type.CRC); public final static Component SUMMARY = new Component(Type.SUMMARY); public final static Component TOC = new Component(Type.TOC); @@ -138,11 +143,23 @@ public class Component case FILTER: component = Component.FILTER; break; case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break; case STATS: component = Component.STATS; break; - case DIGEST: component = Component.DIGEST; break; case CRC: component = Component.CRC; break; case SUMMARY: component = Component.SUMMARY; break; case TOC: component = Component.TOC; break; case CUSTOM: component = new Component(Type.CUSTOM, path.right); break; + case DIGEST: + switch (path.right) + { + case DIGEST_CRC32_NAME: + component = Component.DIGEST_CRC32; + break; + case DIGEST_ADLER32_NAME: + component = Component.DIGEST_ADLER32; + break; + default: + throw new IllegalStateException(); + } + break; default: throw new IllegalStateException(); } diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 38829df..0db6f00 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer; import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer; import org.apache.cassandra.io.sstable.metadata.MetadataSerializer; import org.apache.cassandra.utils.Pair; +import org.apache.hadoop.mapred.JobTracker; import static org.apache.cassandra.io.sstable.Component.separator; @@ -344,4 +345,16 @@ public class Descriptor { return hashCode; } + + public Component digestComponent() + { + switch (version.compressedChecksumType()) + { + case Adler32: + return Component.DIGEST_ADLER32; + case CRC32: + return Component.DIGEST_CRC32; + } + throw new IllegalStateException(); + } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index bd21536..74e4b56 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -131,7 +131,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional Component.STATS, Component.SUMMARY, Component.TOC, - Component.DIGEST)); + Component.DIGEST_CRC32)); if (metadata.params.bloomFilterFpChance < 1.0) components.add(Component.FILTER); diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 9a5eae8..a40c37a 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -122,7 +122,12 @@ public class MetadataSerializer implements IMetadataSerializer in.seek(offset); component = type.serializer.deserialize(descriptor.version, in); } - components.put(type, component); + if (component == null) + { + assert type != MetadataType.HEADER || !descriptor.version.storeRows(); + } + else + components.put(type, component); } return components; } diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 70cd860..b88f4f2 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -109,7 +109,7 @@ public class DataIntegrityMetadata { this.descriptor = descriptor; checksum = descriptor.version.uncompressedChecksumType().newInstance(); - digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST))); + digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(descriptor.digestComponent()))); dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA))); try { @@ -211,7 +211,7 @@ public class DataIntegrityMetadata public void writeFullChecksum(Descriptor descriptor) { - File outFile = new File(descriptor.filenameFor(Component.DIGEST)); + File outFile = new File(descriptor.filenameFor(descriptor.digestComponent())); try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8)) { out.write(String.valueOf(fullChecksum.getValue())); diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 920eee0..1420cae 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -173,7 +173,7 @@ public class FileUtils public static void renameWithConfirm(File from, File to) { - assert from.exists(); + assert from.exists() : String.format("File to rename does not exist: %s", from.getPath()); if (logger.isDebugEnabled()) logger.debug((String.format("Renaming %s to %s", from.getPath(), to.getPath()))); // this is not FSWE because usually when we see it it's because we didn't close the file before renaming it, diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java index 13ce0c1..0233169 100644 --- a/test/unit/org/apache/cassandra/db/VerifyTest.java +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java @@ -275,11 +275,11 @@ public class VerifyTest SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw"); + RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()), "rw"); Long correctChecksum = Long.parseLong(file.readLine()); file.close(); - writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST)); + writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())); try (Verifier verifier = new Verifier(cfs, sstable, false)) { @@ -315,7 +315,7 @@ public class VerifyTest file.close(); // Update the Digest to have the right Checksum - writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(Component.DIGEST)); + writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())); try (Verifier verifier = new Verifier(cfs, sstable, false)) {
Attachments
Issue Links
- links to