Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22584

Use protobuf-shaded in StateFun core.

    XMLWordPrintableJSON

Details

    Description

      We have statefun-protobuf-shaded module, that was introduced for the remote Java sdk.

      we can use it to shade protobuf internally, to reduce the dependency surface.

      The major hurdle we need to overcome is that, in embedded functions, we have to be able to accept instances of protobuf generated messages by the user.

      For example:

      UserProfile userProfile = UserProfile.newBilder().build();
      context.send(..., userProfile) 

      If we will simply use the shaded Protobuf version, we will get immediately a class cast exception.

      One way to overcome this is to use reflection and find the well known methods on the generated classes and call toBytes() / parseFrom() reflectively.

      This however will cause a significant slow down, even by using MethodHandles.
      A small experiment that I've previously done with ByteBuddy mitigates this, by generating
      accessors, in pre-flight:

      package org.apache.flink.statefun.flink.common.protobuf.serde;
      
      import static net.bytebuddy.matcher.ElementMatchers.named;import java.io.InputStream;
      import java.io.OutputStream;
      import java.lang.reflect.InvocationTargetException;
      import java.lang.reflect.Method;
      import net.bytebuddy.ByteBuddy;
      import net.bytebuddy.dynamic.DynamicType;
      import net.bytebuddy.implementation.FixedValue;
      import net.bytebuddy.implementation.MethodCall;
      import net.bytebuddy.implementation.bytecode.assign.Assigner;final class ReflectiveProtobufSerde {  @SuppressWarnings({"unchecked", "rawtypes"})
        static <M> ProtobufSerde<M> ofProtobufGeneratedType(Class<M> type) {
          try {
            DynamicType.Unloaded<ProtobufSerde> unloaded = configureByteBuddy(type);      Class<? extends ProtobufSerde> writer = unloaded.load(type.getClassLoader()).getLoaded();      return (ProtobufSerde<M>) writer.getDeclaredConstructor().newInstance();
          } catch (Throwable e) {
            throw new IllegalArgumentException();
          }
        }  @SuppressWarnings("rawtypes")
        private static DynamicType.Unloaded<ProtobufSerde> configureByteBuddy(Class<?> type)
            throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
          Method writeToMethod = type.getMethod("writeTo", OutputStream.class);
          Method parseFromMethod = type.getMethod("parseFrom", InputStream.class);
          Method getSerializedSizeMethod = type.getMethod("getSerializedSize");    // get the message full name
          Method getDescriptorMethod = type.getMethod("getDescriptor");
          Object descriptor = getDescriptorMethod.invoke(null);
          Method getFullNameMethod = descriptor.getClass().getMethod("getFullName");
          String messageFullName = (String) getFullNameMethod.invoke(descriptor);    return new ByteBuddy()
              .subclass(ProtobufSerde.class)
              .typeVariable("M", type)
              .method(named("writeTo"))
              .intercept(
                  MethodCall.invoke(writeToMethod)
                      .onArgument(0)
                      .withArgument(1)
                      .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
              .method(named("parseFrom"))
              .intercept(MethodCall.invoke(parseFromMethod).withArgument(0))
              .method(named("getSerializedSize"))
              .intercept(
                  MethodCall.invoke(getSerializedSizeMethod)
                      .onArgument(0)
                      .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
              .method(named("getMessageFullName"))
              .intercept(FixedValue.value(messageFullName))
              .make();
        }
      }
       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              igal Igal Shilman
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: