Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-19220

S3A : S3AInputStream positioned readFully Expectation

    XMLWordPrintableJSON

Details

    • Test
    • Status: Resolved
    • Major
    • Resolution: Works for Me
    • None
    • None
    • fs/s3
    • None

    Description

      So basically i was testing to write some unit test - for S3AInputStream readFully Method

      package org.apache.hadoop.fs.s3a;

      import java.io.EOFException;
      import java.io.FilterInputStream;
      import java.io.IOException;
      import java.io.InputStream;
      import java.net.SocketException;
      import java.net.URI;
      import java.nio.ByteBuffer;
      import java.nio.charset.Charset;
      import java.nio.charset.StandardCharsets;
      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.TimeUnit;

      import org.apache.commons.io.IOUtils;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
      import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
      import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
      import org.apache.hadoop.util.functional.CallableRaisingIOE;
      import org.assertj.core.api.Assertions;
      import org.junit.Before;
      import org.junit.Test;
      import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
      import software.amazon.awssdk.awscore.exception.AwsServiceException;
      import software.amazon.awssdk.core.ResponseInputStream;
      import software.amazon.awssdk.http.AbortableInputStream;
      import software.amazon.awssdk.services.s3.S3Client;
      import software.amazon.awssdk.services.s3.model.GetObjectRequest;
      import software.amazon.awssdk.services.s3.model.GetObjectResponse;

      import static java.lang.Math.min;
      import static java.nio.charset.StandardCharsets.UTF_8;
      import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
      import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
      import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
      import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
      import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
      import static org.apache.hadoop.util.functional.FutureIO.eval;
      import static org.assertj.core.api.Assertions.assertThat;
      import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
      import static org.mockito.ArgumentMatchers.any;
      import static org.mockito.Mockito.never;
      import static org.mockito.Mockito.verify;

      public class TestReadFullyAndPositionalRead {

      private S3AFileSystem fs;
      private S3AInputStream input;
      private S3Client s3;
      private static final String EMPTY = "";
      private static final String INPUT = "test_content";

      @Before
      public void setUp() throws IOException

      { Configuration conf = createConfiguration(); fs = new S3AFileSystem(); URI uri = URI.create(FS_S3A + "://" + MockS3AFileSystem.BUCKET); // Unset S3CSE property from config to avoid pathIOE. conf.unset(Constants.S3_ENCRYPTION_ALGORITHM); fs.initialize(uri, conf); s3 = fs.getS3AInternals().getAmazonS3Client("mocking"); }

      public Configuration createConfiguration()

      { Configuration conf = new Configuration(); conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class, S3ClientFactory.class); // use minimum multipart size for faster triggering conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE); conf.setInt(Constants.S3A_BUCKET_PROBE, 1); // this is so stream draining is always blocking, allowing assertions to be safely made without worrying about any race conditions conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE); // set the region to avoid the getBucketLocation on FS init. conf.set(AWS_REGION, "eu-west-1"); return conf; }

      @Test
      public void testReadFullyFromBeginning() throws IOException

      { input = getMockedS3AInputStream(INPUT); byte[] byteArray = new byte[INPUT.length()]; input.readFully(0, byteArray, 0, byteArray.length); assertThat(new String(byteArray, UTF_8)).isEqualTo(INPUT); }

      @Test
      public void testReadFullyWithOffsetAndLength() throws IOException

      { input = getMockedS3AInputStream(INPUT); byte[] byteArray = new byte[4]; input.readFully(5, byteArray, 0, 4); assertThat(new String(byteArray, UTF_8)).isEqualTo("cont"); }

      @Test
      public void testReadFullyWithOffsetBeyondStream() throws IOException

      { input = getMockedS3AInputStream(INPUT); byte[] byteArray = new byte[10]; assertThatExceptionOfType(EOFException.class) .isThrownBy(() -> input.readFully(20, byteArray, 0, 10)); }

      private S3AInputStream getMockedS3AInputStream(String input)

      { Path path = new Path("test-path"); String eTag = "test-etag"; String versionId = "test-version-id"; String owner = "test-owner"; S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0, path, input.length(), owner, eTag, versionId); S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes( fs.getBucket(), path, fs.pathToKey(path), fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(), eTag, versionId, input.length()); S3AReadOpContext s3AReadOpContext = fs.createReadContext(s3AFileStatus, NoopSpan.INSTANCE); return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, getMockedInputStreamCallback(input), s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS, "s3a-bounded")); }

      private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback(String input) {
      GetObjectResponse objectResponse = GetObjectResponse.builder().eTag("test-etag").build();
      ResponseInputStream<GetObjectResponse>[] responseInputStreams = new ResponseInputStream[]

      { getMockedInputStream(objectResponse, true, input), getMockedInputStream(objectResponse, true, input), getMockedInputStream(objectResponse, false, input) }

      ;
      return new S3AInputStream.InputStreamCallbacks() {
      private Integer mockedS3ObjectIndex = 0;
      @Override
      public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) {
      mockedS3ObjectIndex++;
      if (mockedS3ObjectIndex == 3)

      { throw AwsServiceException.builder() .message("Failed to get S3Object") .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build()) .build(); }

      return responseInputStreams[min(mockedS3ObjectIndex, responseInputStreams.length) - 1];
      }

      @Override
      public GetObjectRequest.Builder newGetRequestBuilder(String key)

      { return GetObjectRequest.builder().bucket(fs.getBucket()).key(key); }

      @Override
      public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> task)

      { return eval(task); }

      @Override
      public void close() {
      }
      };
      }

      private ResponseInputStream<GetObjectResponse> getMockedInputStream(
      GetObjectResponse response, boolean success, String input) {
      FilterInputStream stream = new FilterInputStream(AbortableInputStream.create(
      IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> {
      })) {
      @Override
      public void close() throws IOException {
      super.close();
      if (!success)

      { throw new SocketException("Socket closed"); }

      }
      };
      return new ResponseInputStream<>(response, stream);
      }

      }

      Now this -

      [ERROR] TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136 expected:<"[con]t"> but was:<"[tes]t">

      is the failure its not adhering to the position parameter and reading the inital bytes only

      What is the expectation of the readFully Function in S3AInputStream?

      Attachments

        Activity

          People

            Unassigned Unassigned
            vinay._.devadiga Vinay Devadiga
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: