Handler.kt

  1. package com.hexagontk.messaging.rabbitmq

  2. import com.hexagontk.core.loggerOf
  3. import com.hexagontk.helpers.retry
  4. import com.hexagontk.core.media.MediaType
  5. import com.hexagontk.serialization.SerializationFormat
  6. import com.hexagontk.serialization.SerializationManager.formatOfOrNull
  7. import com.hexagontk.serialization.serialize
  8. import com.rabbitmq.client.AMQP.BasicProperties
  9. import com.rabbitmq.client.Channel
  10. import com.rabbitmq.client.ConnectionFactory
  11. import com.rabbitmq.client.DefaultConsumer
  12. import com.rabbitmq.client.Envelope
  13. import com.rabbitmq.client.ShutdownSignalException
  14. import java.lang.System.Logger
  15. import java.nio.charset.Charset
  16. import java.nio.charset.Charset.defaultCharset
  17. import java.util.concurrent.ExecutorService
  18. import kotlin.reflect.KClass
  19. import com.hexagontk.core.trace
  20. import com.hexagontk.core.warn
  21. import com.hexagontk.core.error
  22. import com.hexagontk.core.debug
  23. import com.hexagontk.serialization.parseMap

  24. /**
  25.  * Message handler that can reply messages to a reply queue.
  26.  *
  27.  * TODO Test content type support.
  28.  */
  29. internal class Handler<T : Any, R : Any> internal constructor (
  30.     connectionFactory: ConnectionFactory,
  31.     channel: Channel,
  32.     private val executor: ExecutorService,
  33.     private val type: KClass<T>,
  34.     private val handler: (T) -> R,
  35.     private val serializationFormat: SerializationFormat,
  36.     val decoder: (Map<String, *>) -> T
  37. ) : DefaultConsumer(channel) {

  38.     private companion object {

  39.         private const val RETRIES = 5
  40.         private const val DELAY = 50L
  41.     }

  42.     private val log: Logger = loggerOf(this::class)

  43.     private val client: RabbitMqClient by lazy {
  44.         RabbitMqClient(connectionFactory, serializationFormat = serializationFormat)
  45.     }

  46.     /** @see DefaultConsumer.handleDelivery */
  47.     override fun handleDelivery(
  48.         consumerTag: String, envelope: Envelope, properties: BasicProperties, body: ByteArray) {

  49.         executor.execute {
  50.             val charset = properties.contentEncoding ?: defaultCharset().name()
  51.             val correlationId = properties.correlationId
  52.             val replyTo = properties.replyTo
  53.             val contentType = properties.contentType
  54.                 ?.let { MediaType(it) }
  55.                 ?.let { formatOfOrNull(it) }
  56.                 ?: serializationFormat

  57.             try {
  58.                 log.trace { "Received message ($correlationId) in $charset" }
  59.                 val request = String(body, Charset.forName(charset))
  60.                 log.trace { "Message body:\n$request" }
  61.                 val input = decoder(request.parseMap(contentType))

  62.                 val response = handler(input)

  63.                 if (replyTo != null)
  64.                     handleResponse(response, replyTo, correlationId)
  65.             }
  66.             catch (ex: Exception) {
  67.                 log.warn(ex) { "Error processing message ($correlationId) in $charset" }

  68.                 if (replyTo != null)
  69.                     handleError(ex, replyTo, correlationId)
  70.             }
  71.             finally {
  72.                 retry(RETRIES, DELAY) {
  73.                     channel.basicAck(envelope.deliveryTag, false)
  74.                 }
  75.             }
  76.         }
  77.     }

  78.     /** @see DefaultConsumer.handleCancel */
  79.     override fun handleCancel(consumerTag: String?) {
  80.         log.error { "Unexpected cancel for the consumer $consumerTag" }
  81.     }

  82.     /** @see DefaultConsumer.handleCancelOk */
  83.     override fun handleCancelOk(consumerTag: String) {
  84.         log.debug { "Explicit cancel for the consumer $consumerTag" }
  85.     }

  86.     /** @see DefaultConsumer.handleShutdownSignal */
  87.     override fun handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) {
  88.         if (sig.isInitiatedByApplication) {
  89.             log.debug { "Consumer $consumerTag: shutdown is initiated by application. Ignoring it" }
  90.         }
  91.         else {
  92.             val msg = sig.localizedMessage ?: ""
  93.             log.debug { "Consumer $consumerTag shutdown error $msg" }
  94.         }
  95.     }

  96.     /** @see DefaultConsumer.handleConsumeOk */
  97.     override fun handleConsumeOk(consumerTag: String) {
  98.         log.debug { "Consumer $consumerTag has been registered" }
  99.     }

  100.     private fun handleResponse(response: R, replyTo: String, correlationId: String?) {
  101.         val output = when (response) {
  102.             is String -> response
  103.             is Int -> response.toString()
  104.             is Long -> response.toString()
  105.             else -> response.serialize(serializationFormat)
  106.         }

  107.         client.publish(replyTo, output, correlationId)
  108.     }

  109.     private fun handleError(exception: Exception, replyTo: String, correlationId: String?) {
  110.         val message = exception.message ?: ""
  111.         val errorMessage = message.ifBlank { exception.javaClass.name }
  112.         client.publish(replyTo, errorMessage, correlationId)
  113.     }
  114. }