Details
-
Test
-
Status: Resolved
-
Major
-
Resolution: Works for Me
-
None
-
None
-
None
Description
So basically i was testing to write some unit test - for S3AInputStream readFully Method
package org.apache.hadoop.fs.s3a;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import static java.lang.Math.min;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
import static org.apache.hadoop.util.functional.FutureIO.eval;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
public class TestReadFullyAndPositionalRead {
private S3AFileSystem fs;
private S3AInputStream input;
private S3Client s3;
private static final String EMPTY = "";
private static final String INPUT = "test_content";
@Before
public void setUp() throws IOException
public Configuration createConfiguration()
{ Configuration conf = new Configuration(); conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class, S3ClientFactory.class); // use minimum multipart size for faster triggering conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE); conf.setInt(Constants.S3A_BUCKET_PROBE, 1); // this is so stream draining is always blocking, allowing assertions to be safely made without worrying about any race conditions conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE); // set the region to avoid the getBucketLocation on FS init. conf.set(AWS_REGION, "eu-west-1"); return conf; } @Test
public void testReadFullyFromBeginning() throws IOException
@Test
public void testReadFullyWithOffsetAndLength() throws IOException
@Test
public void testReadFullyWithOffsetBeyondStream() throws IOException
private S3AInputStream getMockedS3AInputStream(String input)
{ Path path = new Path("test-path"); String eTag = "test-etag"; String versionId = "test-version-id"; String owner = "test-owner"; S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0, path, input.length(), owner, eTag, versionId); S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes( fs.getBucket(), path, fs.pathToKey(path), fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(), eTag, versionId, input.length()); S3AReadOpContext s3AReadOpContext = fs.createReadContext(s3AFileStatus, NoopSpan.INSTANCE); return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, getMockedInputStreamCallback(input), s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS, "s3a-bounded")); } private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback(String input) {
GetObjectResponse objectResponse = GetObjectResponse.builder().eTag("test-etag").build();
ResponseInputStream<GetObjectResponse>[] responseInputStreams = new ResponseInputStream[]
;
return new S3AInputStream.InputStreamCallbacks() {
private Integer mockedS3ObjectIndex = 0;
@Override
public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) {
mockedS3ObjectIndex++;
if (mockedS3ObjectIndex == 3)
return responseInputStreams[min(mockedS3ObjectIndex, responseInputStreams.length) - 1];
}
@Override
public GetObjectRequest.Builder newGetRequestBuilder(String key)
@Override
public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> task)
@Override
public void close() {
}
};
}
private ResponseInputStream<GetObjectResponse> getMockedInputStream(
GetObjectResponse response, boolean success, String input) {
FilterInputStream stream = new FilterInputStream(AbortableInputStream.create(
IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> {
})) {
@Override
public void close() throws IOException {
super.close();
if (!success)
}
};
return new ResponseInputStream<>(response, stream);
}
}
Now this -
[ERROR] TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136 expected:<"[con]t"> but was:<"[tes]t">
is the failure its not adhering to the position parameter and reading the inital bytes only
What is the expectation of the readFully Function in S3AInputStream?