NettyServerHandler.kt

  1. package com.hexagontk.http.server.netty

  2. import com.hexagontk.core.toText
  3. import com.hexagontk.handlers.Context
  4. import com.hexagontk.http.handlers.bodyToBytes
  5. import com.hexagontk.http.model.*
  6. import com.hexagontk.http.model.Cookie
  7. import com.hexagontk.http.handlers.HttpHandler
  8. import com.hexagontk.http.handlers.HttpContext
  9. import com.hexagontk.http.model.CookieSameSite.*
  10. import com.hexagontk.http.model.HttpCall
  11. import com.hexagontk.http.model.HttpResponse
  12. import io.netty.buffer.Unpooled
  13. import io.netty.channel.Channel
  14. import io.netty.channel.ChannelFutureListener.CLOSE
  15. import io.netty.channel.ChannelHandlerContext
  16. import io.netty.channel.ChannelInboundHandlerAdapter
  17. import io.netty.handler.codec.http.*
  18. import io.netty.handler.codec.http.HttpHeaderNames.*
  19. import io.netty.handler.codec.http.HttpHeaderValues.CHUNKED
  20. import io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE
  21. import io.netty.handler.codec.http.HttpMethod
  22. import io.netty.handler.codec.http.HttpMethod.GET
  23. import io.netty.handler.codec.http.HttpRequest
  24. import io.netty.handler.codec.http.HttpResponseStatus.*
  25. import io.netty.handler.codec.http.HttpVersion.HTTP_1_1
  26. import io.netty.handler.codec.http.cookie.CookieHeaderNames.SameSite.*
  27. import io.netty.handler.codec.http.cookie.DefaultCookie
  28. import io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT as STRICT_ENCODER
  29. import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
  30. import io.netty.handler.ssl.SslHandler
  31. import io.netty.handler.ssl.SslHandshakeCompletionEvent
  32. import java.security.cert.X509Certificate
  33. import java.util.concurrent.Executor
  34. import java.util.concurrent.Flow.*
  35. import com.hexagontk.http.model.HttpRequest as HexagonHttpRequest

  36. internal class NettyServerHandler(
  37.     private val handlers: Map<HttpMethod, HttpHandler>,
  38.     executor: Executor?,
  39.     private val sslHandler: SslHandler?,
  40.     private val enableWebsockets: Boolean = true,
  41. ) : ChannelInboundHandlerAdapter() {

  42.     private var certificates: List<X509Certificate> = emptyList()
  43.     private val httpRequestProcessor: (ChannelHandlerContext, HttpRequest) -> Unit =
  44.         executor
  45.             ?.let { x ->
  46.                 { context: ChannelHandlerContext, nettyRequest: HttpRequest ->
  47.                     x.execute {
  48.                         try {
  49.                             readHttpRequest(context, nettyRequest)
  50.                         }
  51.                         catch (e: Exception) {
  52.                             exceptionCaught(context, e)
  53.                         }
  54.                     }
  55.                 }
  56.             }
  57.             ?: this::readHttpRequest

  58.     override fun channelRead(context: ChannelHandlerContext, nettyRequest: Any) {
  59.         if (nettyRequest is HttpRequest)
  60.             httpRequestProcessor(context, nettyRequest)
  61.     }

  62.     private fun readHttpRequest(context: ChannelHandlerContext, nettyRequest: HttpRequest) {
  63.         val result = nettyRequest.decoderResult()

  64.         if (result.isFailure)
  65.             throw IllegalStateException(result.cause())

  66.         val channel = context.channel()
  67.         val method = nettyRequest.method()
  68.         val pathHandler = handlers[method]

  69.         val headers = nettyRequest.headers()
  70.         val request = NettyRequestAdapter(method, nettyRequest, certificates, channel, headers)

  71.         if (pathHandler == null) {
  72.             writeResponse(context, request, HttpResponse(), HttpUtil.isKeepAlive(nettyRequest))
  73.             return
  74.         }

  75.         val resultContext = pathHandler.process(request)
  76.         val response = resultContext.event.response

  77.         val isWebSocket =
  78.             if (enableWebsockets) isWebsocket(headers, method, response.status)
  79.             else false

  80.         val body = response.body
  81.         val isSse = body is Publisher<*>

  82.         when {
  83.             isSse -> handleSse(context, request, response, body)
  84.             isWebSocket -> handleWebSocket(context, resultContext, response, nettyRequest, channel)
  85.             else -> writeResponse(context, request, response, HttpUtil.isKeepAlive(nettyRequest))
  86.         }
  87.     }

  88.     private fun isWebsocket(headers: HttpHeaders, method: HttpMethod, status: Int): Boolean {
  89.         val connection = headers[CONNECTION]?.lowercase()
  90.         val upgrade = headers[UPGRADE]?.lowercase()
  91.         return connection == "upgrade"
  92.             && upgrade == "websocket"
  93.             && method == GET
  94.             && status == ACCEPTED_202
  95.     }

  96.     @Suppress("UNCHECKED_CAST") // Body not cast to Publisher<HttpServerEvent> due to type erasure
  97.     private fun handleSse(
  98.         context: ChannelHandlerContext,
  99.         hexagonRequest: HttpRequestPort,
  100.         response: HttpResponsePort, body: Any,
  101.     ) {
  102.         val status = nettyStatus(response.status)
  103.         val nettyResponse = DefaultHttpResponse(HTTP_1_1, status)
  104.         val headers = nettyResponse.headers()

  105.         val hexagonHeaders = response.headers
  106.         if (hexagonHeaders.fields.isNotEmpty())
  107.             hexagonHeaders.all.map { (k, v) -> headers.add(k, v.map { it.text }) }

  108.         val hexagonCookies = response.cookies
  109.         if (hexagonCookies.isNotEmpty()) {
  110.             val cookies = nettyCookies(hexagonRequest.protocol.secure, hexagonCookies)
  111.             headers[SET_COOKIE] = STRICT_ENCODER.encode(cookies)
  112.         }

  113.         val contentType = response.contentType
  114.         if (contentType != null)
  115.             headers[CONTENT_TYPE] = contentType.text

  116.         headers[TRANSFER_ENCODING] = CHUNKED
  117.         headers[CONNECTION] = KEEP_ALIVE
  118.         context.writeAndFlush(nettyResponse)

  119.         // TODO Close when publisher is done
  120.         val publisher = body as Publisher<ServerEvent>
  121.         publisher.subscribe(object : Subscriber<ServerEvent> {
  122.             override fun onError(throwable: Throwable) {}

  123.             override fun onComplete() {
  124.                 context.close()
  125.             }

  126.             override fun onSubscribe(subscription: Subscription) {
  127.                 subscription.request(Long.MAX_VALUE)
  128.             }

  129.             override fun onNext(item: ServerEvent) {
  130.                 val eventData = Unpooled.copiedBuffer(item.eventData.toByteArray())
  131.                 context.writeAndFlush(DefaultHttpContent(eventData))
  132.             }
  133.         })
  134.     }

  135.     private fun handleWebSocket(
  136.         context: ChannelHandlerContext,
  137.         request: Context<HttpCall>,
  138.         response: HttpResponsePort,
  139.         nettyRequest: HttpRequest,
  140.         channel: Channel
  141.     ) {
  142.         val session = NettyWsSession(context, HttpContext(request))
  143.         val nettyWebSocketHandler = NettyWebSocketHandler(
  144.             session,
  145.             response.onBinary,
  146.             response.onText,
  147.             response.onPing,
  148.             response.onPong,
  149.             response.onClose,
  150.         )

  151.         context.pipeline().replace(this, "webSocketHandler", nettyWebSocketHandler)
  152.         wsHandshake(nettyRequest, channel)
  153.         session.(response.onConnect)()
  154.     }

  155.     private fun wsHandshake(nettyRequest: HttpRequest, channel: Channel) {
  156.         val host = nettyRequest.headers()["host"]
  157.         val uri = nettyRequest.uri()
  158.         val url = "ws://$host$uri"
  159.         val wsFactory = WebSocketServerHandshakerFactory(url, null, true)
  160.         val handShaker = wsFactory.newHandshaker(nettyRequest)

  161.         if (handShaker == null)
  162.             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel)
  163.         else
  164.             handShaker.handshake(channel, nettyRequest)
  165.     }

  166.     override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
  167.         if (evt is SslHandshakeCompletionEvent && sslHandler != null) {
  168.             val peerCertificates = sslHandler.engine().session.peerCertificates
  169.             certificates = peerCertificates.map { it as X509Certificate }
  170.         }
  171.     }

  172.     @Suppress("OVERRIDE_DEPRECATION") // Deprecated in base interface, but allowed in parent class
  173.     override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
  174.         val response = HttpResponse(bodyToBytes(cause.toText()), status = INTERNAL_SERVER_ERROR_500)
  175.         writeResponse(context, HexagonHttpRequest(), response, false)
  176.     }

  177.     private fun writeResponse(
  178.         context: ChannelHandlerContext,
  179.         hexagonRequest: HttpRequestPort,
  180.         hexagonResponse: HttpResponsePort,
  181.         keepAlive: Boolean,
  182.     ) {
  183.         val buffer = Unpooled.copiedBuffer(bodyToBytes(hexagonResponse.body))
  184.         val status = nettyStatus(hexagonResponse.status)
  185.         val response = DefaultFullHttpResponse(HTTP_1_1, status, buffer)

  186.         val headers = response.headers()
  187.         val hexagonHeaders = hexagonResponse.headers
  188.         if (hexagonHeaders.fields.isNotEmpty())
  189.             hexagonHeaders.all.map { (k, v) -> headers.add(k, v.map { it.text }) }

  190.         val hexagonCookies = hexagonResponse.cookies
  191.         if (hexagonCookies.isNotEmpty()) {
  192.             val cookies = nettyCookies(hexagonRequest.protocol.secure, hexagonCookies)
  193.             headers[SET_COOKIE] = STRICT_ENCODER.encode(cookies)
  194.         }

  195.         val contentType = hexagonResponse.contentType
  196.         if (contentType != null)
  197.             headers[CONTENT_TYPE] = contentType.text

  198.         if (keepAlive) {
  199.             headers.setInt(CONTENT_LENGTH, response.content().readableBytes())
  200.             headers[CONNECTION] = KEEP_ALIVE
  201.             context.writeAndFlush(response)
  202.         }
  203.         else {
  204.             context.writeAndFlush(response).addListener(CLOSE)
  205.         }
  206.     }

  207.     private fun nettyCookies(secureRequest: Boolean, hexagonCookies: List<Cookie>) =
  208.         hexagonCookies
  209.             .filter { if (secureRequest) true else !it.secure }
  210.             .map {
  211.                 DefaultCookie(it.name, it.value).apply {
  212.                     if (it.maxAge != -1L)
  213.                         setMaxAge(it.maxAge)
  214.                     isSecure = it.secure
  215.                     setPath(it.path)
  216.                     setDomain(it.domain)
  217.                     isHttpOnly = it.httpOnly
  218.                     it.domain?.let(::setDomain)
  219.                     it.sameSite
  220.                         ?.let { ss ->
  221.                             when (ss) {
  222.                                 STRICT -> Strict
  223.                                 LAX -> Lax
  224.                                 NONE -> None
  225.                             }
  226.                         }
  227.                         ?.let(::setSameSite)
  228.                 }
  229.             }

  230.     internal fun nettyStatus(status: Int): HttpResponseStatus =
  231.         when (status) {
  232.             CONTINUE_100 -> CONTINUE
  233.             SWITCHING_PROTOCOLS_101 -> SWITCHING_PROTOCOLS
  234.             PROCESSING_102 -> PROCESSING

  235.             OK_200 -> OK
  236.             CREATED_201 -> CREATED
  237.             ACCEPTED_202 -> ACCEPTED
  238.             NON_AUTHORITATIVE_INFORMATION_203 -> NON_AUTHORITATIVE_INFORMATION
  239.             NO_CONTENT_204 -> NO_CONTENT
  240.             RESET_CONTENT_205 -> RESET_CONTENT
  241.             PARTIAL_CONTENT_206 -> PARTIAL_CONTENT
  242.             MULTI_STATUS_207 -> MULTI_STATUS

  243.             MULTIPLE_CHOICES_300 -> MULTIPLE_CHOICES
  244.             MOVED_PERMANENTLY_301 -> MOVED_PERMANENTLY
  245.             FOUND_302 -> FOUND
  246.             SEE_OTHER_303 -> SEE_OTHER
  247.             NOT_MODIFIED_304 -> NOT_MODIFIED
  248.             USE_PROXY_305 -> USE_PROXY
  249.             TEMPORARY_REDIRECT_307 -> TEMPORARY_REDIRECT
  250.             PERMANENT_REDIRECT_308 -> PERMANENT_REDIRECT

  251.             BAD_REQUEST_400 -> BAD_REQUEST
  252.             NOT_FOUND_404 -> NOT_FOUND
  253.             UNAUTHORIZED_401 -> UNAUTHORIZED
  254.             PAYMENT_REQUIRED_402 -> PAYMENT_REQUIRED
  255.             FORBIDDEN_403 -> FORBIDDEN
  256.             METHOD_NOT_ALLOWED_405 -> METHOD_NOT_ALLOWED
  257.             NOT_ACCEPTABLE_406 -> NOT_ACCEPTABLE
  258.             PROXY_AUTHENTICATION_REQUIRED_407 -> PROXY_AUTHENTICATION_REQUIRED
  259.             REQUEST_TIMEOUT_408 -> REQUEST_TIMEOUT
  260.             CONFLICT_409 -> CONFLICT
  261.             GONE_410 -> GONE
  262.             LENGTH_REQUIRED_411 -> LENGTH_REQUIRED
  263.             PRECONDITION_FAILED_412 -> PRECONDITION_FAILED
  264.             URI_TOO_LONG_414 -> REQUEST_URI_TOO_LONG
  265.             UNSUPPORTED_MEDIA_TYPE_415 -> UNSUPPORTED_MEDIA_TYPE
  266.             RANGE_NOT_SATISFIABLE_416 -> REQUESTED_RANGE_NOT_SATISFIABLE
  267.             EXPECTATION_FAILED_417 -> EXPECTATION_FAILED
  268.             MISDIRECTED_REQUEST_421 -> MISDIRECTED_REQUEST
  269.             UNPROCESSABLE_CONTENT_422 -> UNPROCESSABLE_ENTITY
  270.             LOCKED_423 -> LOCKED
  271.             FAILED_DEPENDENCY_424 -> FAILED_DEPENDENCY
  272.             UPGRADE_REQUIRED_426 -> UPGRADE_REQUIRED
  273.             PRECONDITION_REQUIRED_428 -> PRECONDITION_REQUIRED
  274.             TOO_MANY_REQUESTS_429 -> TOO_MANY_REQUESTS
  275.             REQUEST_HEADER_FIELDS_TOO_LARGE_431 -> REQUEST_HEADER_FIELDS_TOO_LARGE

  276.             INTERNAL_SERVER_ERROR_500 -> INTERNAL_SERVER_ERROR
  277.             NOT_IMPLEMENTED_501 -> NOT_IMPLEMENTED
  278.             BAD_GATEWAY_502 -> BAD_GATEWAY
  279.             SERVICE_UNAVAILABLE_503 -> SERVICE_UNAVAILABLE
  280.             GATEWAY_TIMEOUT_504 -> GATEWAY_TIMEOUT
  281.             HTTP_VERSION_NOT_SUPPORTED_505 -> HTTP_VERSION_NOT_SUPPORTED
  282.             VARIANT_ALSO_NEGOTIATES_506 -> VARIANT_ALSO_NEGOTIATES
  283.             INSUFFICIENT_STORAGE_507 -> INSUFFICIENT_STORAGE
  284.             NOT_EXTENDED_510 -> NOT_EXTENDED
  285.             NETWORK_AUTHENTICATION_REQUIRED_511 -> NETWORK_AUTHENTICATION_REQUIRED

  286.             else -> HttpResponseStatus(status, status.toString())
  287.         }
  288. }