Description
Dear Kafka Fellows,
currently, we are facing problems with Kafka Streams.
We try to transform a set of messages into a state store.
The functionality is working, but after a certain period the application returns the error
Key value store is not initialzed.
We tried alot of solutions, like using the Kafka events or loops to wait until the store is available again. But the system is not able to healh again.
Colleagues of us use the Kubernetes Health check to restart the application when this issues comes up. But we think this is not a proper solution.
What are you recommending?
Thanks a lot for your help
Our Code
@Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = "#result == null || #result.cachedObject == null || #result.cachedObject.isEmpty()") public GenericCacheable<List<MyIcpNotification>> getMyIcpNotificationsForUser(final String uuid, final String emailAddress) throws InterruptedException { if (!hasText(emailAddress)) { LOGGER.error("[{}]: getMyIcpNotificationsForUser was called with an invalid email address.", uuid); return new GenericCacheable<>(Collections.emptyList(), null); } if (keyValueStore == null) { initializeStore(uuid); } if (keyValueStore == null) { LOGGER.error("[{}]: Key value store is not initialized.", uuid); return new GenericCacheable<>(Collections.emptyList(), null); } final List<Command<MyIcpPayload>> commandList = keyValueStore.get(emailAddress); if (commandList == null) { return new GenericCacheable<>(Collections.emptyList(), null); } //@formatter:off final List<MyIcpNotification> list = commandList .stream() .map(this::mapToNotification) .collect(Collectors.toList()); //@formatter:on return new GenericCacheable<>(list, LocalDateTime.now()); }
private void initializeStore(final String uuid) throws InterruptedException { int counter = 0; while (counter < 5) { try { keyValueStore = myIcpMessagesStream.store(storeName, QueryableStoreTypes.keyValueStore()); return; } catch (final Exception e) { LOGGER.debug("[{}]: Error while loading the state store [{}]", uuid, e.getMessage()); Thread.sleep(1000); counter++; } } }
public KafkaStreams myIcpMessagesStream(@Qualifier("myIcpEvents") final StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception { final StreamsBuilder myicpQueryStreamBuilder = Objects.requireNonNull(streamsBuilderFactoryBean.getObject()); final StoreBuilder<KeyValueStore<String, List<Command<MyIcpPayload>>>> keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), Serdes.String(), new CommandListSerde<>()); myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder); //@formatter:off myicpQueryStreamBuilder .stream(kafkaTopicNames.getMyIcpMessageTopic(), Consumed.with(Serdes.String(), new CommandSerde<>())) .mapValues(this::mapPayloadToMyIcpPayload) .transformValues(() -> commandTransformer, storeName); //@formatter:on final KafkaStreams kafkaStreams = new KafkaStreams(myicpQueryStreamBuilder.build(), Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration())); kafkaStreams.start(); return kafkaStreams; }
public class CommandTransformer implements ValueTransformer<Command<MyIcpPayload>, List<Command<MyIcpPayload>>> { private static final Logger LOGGER = LoggerFactory.getLogger(CommandTransformer.class); @Value("${ifx.notificationService.myicp.storeName}") private String storeName; @Value("${ifx.notificationService.myicp.maxStoreSize}") private int maxStoreSize; private KeyValueStore<String, List<Command<MyIcpPayload>>> keyValueStore; @Override public void init(final ProcessorContext context) { keyValueStore = (KeyValueStore<String, List<Command<MyIcpPayload>>>) context.getStateStore(storeName); } @Override @CacheEvict(value = MYICP_NOTIFICATIONS, key = "#value.payload.user.emailAddress") public List<Command<MyIcpPayload>> transform(final Command<MyIcpPayload> value) { if (value == null) { return Collections.emptyList(); } final List<Command<MyIcpPayload>> listForUser = getCommandListForUser(value); if (isInvalidValue(value, listForUser)) { return listForUser; } if (listForUser.size() >= maxStoreSize) { listForUser.remove(0); } LOGGER.debug("[{}] current list [{}]", value.getPayload().getUser().getEmailAddress(), listForUser.size()); listForUser.add(value); keyValueStore.put(value.getPayload().getUser().getEmailAddress(), listForUser); LOGGER.debug("[{}] list after update [{}]", value.getPayload().getUser().getEmailAddress(), listForUser.size()); return listForUser; } private boolean isInvalidValue(final Command<MyIcpPayload> value, final List<Command<MyIcpPayload>> listForUser) { if (uuidAlreadyPresent(value, listForUser)) { return true; } final ZonedDateTime oldestDate = getOldestDateInList(listForUser); return nonNull(oldestDate) && oldestDate.isAfter(value.getHeader().getTimestamp()); } private ZonedDateTime getOldestDateInList(final List<Command<MyIcpPayload>> listForUser) { //@formatter:off return listForUser .stream() .map(myIcpPayloadCommand -> myIcpPayloadCommand.getHeader().getTimestamp()) .max(ZonedDateTime::compareTo) .orElse(null); //@formatter:on } private boolean uuidAlreadyPresent(final Command<MyIcpPayload> value, final List<Command<MyIcpPayload>> listForUser) { return listForUser.stream().anyMatch(myIcpPayloadCommand -> myIcpPayloadCommand.getHeader().getUuid().equalsIgnoreCase(value.getHeader().getUuid())); } private List<Command<MyIcpPayload>> getCommandListForUser(final Command<MyIcpPayload> value) { List<Command<MyIcpPayload>> listForUser = keyValueStore.get(value.getPayload().getUser().getEmailAddress()); if (isNull(listForUser)) { listForUser = new ArrayList<>(); } return listForUser; } @Override public void close() { // do nothing here } }