NettyServerHandler.kt

  1. package com.hexagonkt.http.server.netty

  2. import com.hexagonkt.handlers.Context
  3. import com.hexagonkt.http.handlers.bodyToBytes
  4. import com.hexagonkt.http.model.*
  5. import com.hexagonkt.http.model.Cookie
  6. import com.hexagonkt.http.handlers.HttpHandler
  7. import com.hexagonkt.http.handlers.HttpContext
  8. import com.hexagonkt.http.model.CookieSameSite.*
  9. import com.hexagonkt.http.model.HttpCall
  10. import com.hexagonkt.http.model.HttpResponse
  11. import io.netty.buffer.Unpooled
  12. import io.netty.channel.Channel
  13. import io.netty.channel.ChannelFutureListener.CLOSE
  14. import io.netty.channel.ChannelHandlerContext
  15. import io.netty.channel.ChannelInboundHandlerAdapter
  16. import io.netty.handler.codec.http.*
  17. import io.netty.handler.codec.http.HttpHeaderNames.*
  18. import io.netty.handler.codec.http.HttpHeaderValues.CHUNKED
  19. import io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE
  20. import io.netty.handler.codec.http.HttpMethod
  21. import io.netty.handler.codec.http.HttpMethod.GET
  22. import io.netty.handler.codec.http.HttpRequest
  23. import io.netty.handler.codec.http.HttpResponseStatus.*
  24. import io.netty.handler.codec.http.HttpVersion.HTTP_1_1
  25. import io.netty.handler.codec.http.cookie.CookieHeaderNames.SameSite.*
  26. import io.netty.handler.codec.http.cookie.DefaultCookie
  27. import io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT as STRICT_ENCODER
  28. import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
  29. import io.netty.handler.ssl.SslHandler
  30. import io.netty.handler.ssl.SslHandshakeCompletionEvent
  31. import java.security.cert.X509Certificate
  32. import java.util.concurrent.Flow.*
  33. import com.hexagonkt.http.model.HttpRequest as HexagonHttpRequest

  34. internal class NettyServerHandler(
  35.     private val handlers: Map<HttpMethod, HttpHandler>,
  36.     private val sslHandler: SslHandler?,
  37.     private val enableWebsockets: Boolean = true,
  38. ) : ChannelInboundHandlerAdapter() {

  39.     private var certificates: List<X509Certificate> = emptyList()

  40.     override fun channelRead(context: ChannelHandlerContext, nettyRequest: Any) {
  41.         if (nettyRequest is HttpRequest)
  42.             readHttpRequest(context, nettyRequest)
  43.     }

  44.     private fun readHttpRequest(context: ChannelHandlerContext, nettyRequest: HttpRequest) {
  45.         val result = nettyRequest.decoderResult()

  46.         if (result.isFailure)
  47.             throw IllegalStateException(result.cause())

  48.         val channel = context.channel()
  49.         val method = nettyRequest.method()
  50.         val pathHandler = handlers[method]

  51.         val headers = nettyRequest.headers()
  52.         val request = NettyRequestAdapter(method, nettyRequest, certificates, channel, headers)

  53.         if (pathHandler == null) {
  54.             writeResponse(context, request, HttpResponse(), HttpUtil.isKeepAlive(nettyRequest))
  55.             return
  56.         }

  57.         val resultContext = pathHandler.process(request)
  58.         val response = resultContext.event.response

  59.         val isWebSocket =
  60.             if (enableWebsockets) isWebsocket(headers, method, response.status)
  61.             else false

  62.         val body = response.body
  63.         val isSse = body is Publisher<*>

  64.         when {
  65.             isSse -> handleSse(context, request, response, body)
  66.             isWebSocket -> handleWebSocket(context, resultContext, response, nettyRequest, channel)
  67.             else -> writeResponse(context, request, response, HttpUtil.isKeepAlive(nettyRequest))
  68.         }
  69.     }

  70.     private fun isWebsocket(headers: HttpHeaders, method: HttpMethod, status: HttpStatus): Boolean {
  71.         val connection = headers[CONNECTION]?.lowercase()
  72.         val upgrade = headers[UPGRADE]?.lowercase()
  73.         return connection == "upgrade"
  74.             && upgrade == "websocket"
  75.             && method == GET
  76.             && status == ACCEPTED_202
  77.     }

  78.     @Suppress("UNCHECKED_CAST") // Body not cast to Publisher<HttpServerEvent> due to type erasure
  79.     private fun handleSse(
  80.         context: ChannelHandlerContext,
  81.         hexagonRequest: HttpRequestPort,
  82.         response: HttpResponsePort, body: Any,
  83.     ) {
  84.         val status = nettyStatus(response.status)
  85.         val nettyResponse = DefaultHttpResponse(HTTP_1_1, status)
  86.         val headers = nettyResponse.headers()

  87.         val hexagonHeaders = response.headers
  88.         if (hexagonHeaders.httpFields.isNotEmpty())
  89.             hexagonHeaders.values.map { (k, v) -> headers.add(k, v) }

  90.         val hexagonCookies = response.cookies
  91.         if (hexagonCookies.isNotEmpty()) {
  92.             val cookies = nettyCookies(hexagonRequest.protocol.secure, hexagonCookies)
  93.             headers[SET_COOKIE] = STRICT_ENCODER.encode(cookies)
  94.         }

  95.         val contentType = response.contentType
  96.         if (contentType != null)
  97.             headers[CONTENT_TYPE] = contentType.text

  98.         headers[TRANSFER_ENCODING] = CHUNKED
  99.         headers[CONNECTION] = KEEP_ALIVE
  100.         context.writeAndFlush(nettyResponse)

  101.         // TODO Close when publisher is done
  102.         val publisher = body as Publisher<ServerEvent>
  103.         publisher.subscribe(object : Subscriber<ServerEvent> {
  104.             override fun onError(throwable: Throwable) {}

  105.             override fun onComplete() {
  106.                 context.close()
  107.             }

  108.             override fun onSubscribe(subscription: Subscription) {
  109.                 subscription.request(Long.MAX_VALUE)
  110.             }

  111.             override fun onNext(item: ServerEvent) {
  112.                 val eventData = Unpooled.copiedBuffer(item.eventData.toByteArray())
  113.                 context.writeAndFlush(DefaultHttpContent(eventData))
  114.             }
  115.         })
  116.     }

  117.     private fun handleWebSocket(
  118.         context: ChannelHandlerContext,
  119.         request: Context<HttpCall>,
  120.         response: HttpResponsePort,
  121.         nettyRequest: HttpRequest,
  122.         channel: Channel
  123.     ) {
  124.         val session = NettyWsSession(context, HttpContext(request))
  125.         val nettyWebSocketHandler = NettyWebSocketHandler(
  126.             session,
  127.             response.onBinary,
  128.             response.onText,
  129.             response.onPing,
  130.             response.onPong,
  131.             response.onClose,
  132.         )

  133.         context.pipeline().replace(this, "webSocketHandler", nettyWebSocketHandler)
  134.         wsHandshake(nettyRequest, channel)
  135.         session.(response.onConnect)()
  136.     }

  137.     private fun wsHandshake(nettyRequest: HttpRequest, channel: Channel) {
  138.         val host = nettyRequest.headers()["host"]
  139.         val uri = nettyRequest.uri()
  140.         val url = "ws://$host$uri"
  141.         val wsFactory = WebSocketServerHandshakerFactory(url, null, true)
  142.         val handShaker = wsFactory.newHandshaker(nettyRequest)

  143.         if (handShaker == null)
  144.             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel)
  145.         else
  146.             handShaker.handshake(channel, nettyRequest)
  147.     }

  148.     override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
  149.         if (evt is SslHandshakeCompletionEvent && sslHandler != null) {
  150.             val peerCertificates = sslHandler.engine().session.peerCertificates
  151.             certificates = peerCertificates.map { it as X509Certificate }
  152.         }
  153.     }

  154.     @Suppress("OVERRIDE_DEPRECATION") // Deprecated in base interface, but allowed in parent class
  155.     override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
  156.         val body = "Failure: $cause\n"
  157.         val response = HttpResponse(body, status = INTERNAL_SERVER_ERROR_500)
  158.         writeResponse(context, HexagonHttpRequest(), response, false)
  159.     }

  160.     private fun writeResponse(
  161.         context: ChannelHandlerContext,
  162.         hexagonRequest: HttpRequestPort,
  163.         hexagonResponse: HttpResponsePort,
  164.         keepAlive: Boolean,
  165.     ) {
  166.         val buffer = Unpooled.copiedBuffer(bodyToBytes(hexagonResponse.body))
  167.         val status = nettyStatus(hexagonResponse.status)
  168.         val response = DefaultFullHttpResponse(HTTP_1_1, status, buffer)

  169.         val headers = response.headers()
  170.         val hexagonHeaders = hexagonResponse.headers
  171.         if (hexagonHeaders.httpFields.isNotEmpty())
  172.             hexagonHeaders.values.map { (k, v) -> headers.add(k, v) }

  173.         val hexagonCookies = hexagonResponse.cookies
  174.         if (hexagonCookies.isNotEmpty()) {
  175.             val cookies = nettyCookies(hexagonRequest.protocol.secure, hexagonCookies)
  176.             headers[SET_COOKIE] = STRICT_ENCODER.encode(cookies)
  177.         }

  178.         val contentType = hexagonResponse.contentType
  179.         if (contentType != null)
  180.             headers[CONTENT_TYPE] = contentType.text

  181.         if (keepAlive) {
  182.             headers.setInt(CONTENT_LENGTH, response.content().readableBytes())
  183.             headers[CONNECTION] = KEEP_ALIVE
  184.             context.writeAndFlush(response)
  185.         }
  186.         else {
  187.             context.writeAndFlush(response).addListener(CLOSE)
  188.         }
  189.     }

  190.     private fun nettyCookies(secureRequest: Boolean, hexagonCookies: List<Cookie>) =
  191.         hexagonCookies
  192.             .filter { if (secureRequest) true else !it.secure }
  193.             .map {
  194.                 DefaultCookie(it.name, it.value).apply {
  195.                     if (it.maxAge != -1L)
  196.                         setMaxAge(it.maxAge)
  197.                     isSecure = it.secure
  198.                     setPath(it.path)
  199.                     setDomain(it.domain)
  200.                     isHttpOnly = it.httpOnly
  201.                     it.domain?.let(::setDomain)
  202.                     it.sameSite
  203.                         ?.let { ss ->
  204.                             when (ss) {
  205.                                 STRICT -> Strict
  206.                                 LAX -> Lax
  207.                                 NONE -> None
  208.                             }
  209.                         }
  210.                         ?.let(::setSameSite)
  211.                 }
  212.             }

  213.     internal fun nettyStatus(status: HttpStatus): HttpResponseStatus =
  214.         when (status) {
  215.             CONTINUE_100 -> CONTINUE
  216.             SWITCHING_PROTOCOLS_101 -> SWITCHING_PROTOCOLS
  217.             PROCESSING_102 -> PROCESSING

  218.             OK_200 -> OK
  219.             CREATED_201 -> CREATED
  220.             ACCEPTED_202 -> ACCEPTED
  221.             NON_AUTHORITATIVE_INFORMATION_203 -> NON_AUTHORITATIVE_INFORMATION
  222.             NO_CONTENT_204 -> NO_CONTENT
  223.             RESET_CONTENT_205 -> RESET_CONTENT
  224.             PARTIAL_CONTENT_206 -> PARTIAL_CONTENT
  225.             MULTI_STATUS_207 -> MULTI_STATUS

  226.             MULTIPLE_CHOICES_300 -> MULTIPLE_CHOICES
  227.             MOVED_PERMANENTLY_301 -> MOVED_PERMANENTLY
  228.             FOUND_302 -> FOUND
  229.             SEE_OTHER_303 -> SEE_OTHER
  230.             NOT_MODIFIED_304 -> NOT_MODIFIED
  231.             USE_PROXY_305 -> USE_PROXY
  232.             TEMPORARY_REDIRECT_307 -> TEMPORARY_REDIRECT
  233.             PERMANENT_REDIRECT_308 -> PERMANENT_REDIRECT

  234.             BAD_REQUEST_400 -> BAD_REQUEST
  235.             NOT_FOUND_404 -> NOT_FOUND
  236.             UNAUTHORIZED_401 -> UNAUTHORIZED
  237.             PAYMENT_REQUIRED_402 -> PAYMENT_REQUIRED
  238.             FORBIDDEN_403 -> FORBIDDEN
  239.             METHOD_NOT_ALLOWED_405 -> METHOD_NOT_ALLOWED
  240.             NOT_ACCEPTABLE_406 -> NOT_ACCEPTABLE
  241.             PROXY_AUTHENTICATION_REQUIRED_407 -> PROXY_AUTHENTICATION_REQUIRED
  242.             REQUEST_TIMEOUT_408 -> REQUEST_TIMEOUT
  243.             CONFLICT_409 -> CONFLICT
  244.             GONE_410 -> GONE
  245.             LENGTH_REQUIRED_411 -> LENGTH_REQUIRED
  246.             PRECONDITION_FAILED_412 -> PRECONDITION_FAILED
  247.             URI_TOO_LONG_414 -> REQUEST_URI_TOO_LONG
  248.             UNSUPPORTED_MEDIA_TYPE_415 -> UNSUPPORTED_MEDIA_TYPE
  249.             RANGE_NOT_SATISFIABLE_416 -> REQUESTED_RANGE_NOT_SATISFIABLE
  250.             EXPECTATION_FAILED_417 -> EXPECTATION_FAILED
  251.             MISDIRECTED_REQUEST_421 -> MISDIRECTED_REQUEST
  252.             UNPROCESSABLE_CONTENT_422 -> UNPROCESSABLE_ENTITY
  253.             LOCKED_423 -> LOCKED
  254.             FAILED_DEPENDENCY_424 -> FAILED_DEPENDENCY
  255.             UPGRADE_REQUIRED_426 -> UPGRADE_REQUIRED
  256.             PRECONDITION_REQUIRED_428 -> PRECONDITION_REQUIRED
  257.             TOO_MANY_REQUESTS_429 -> TOO_MANY_REQUESTS
  258.             REQUEST_HEADER_FIELDS_TOO_LARGE_431 -> REQUEST_HEADER_FIELDS_TOO_LARGE

  259.             INTERNAL_SERVER_ERROR_500 -> INTERNAL_SERVER_ERROR
  260.             NOT_IMPLEMENTED_501 -> NOT_IMPLEMENTED
  261.             BAD_GATEWAY_502 -> BAD_GATEWAY
  262.             SERVICE_UNAVAILABLE_503 -> SERVICE_UNAVAILABLE
  263.             GATEWAY_TIMEOUT_504 -> GATEWAY_TIMEOUT
  264.             HTTP_VERSION_NOT_SUPPORTED_505 -> HTTP_VERSION_NOT_SUPPORTED
  265.             VARIANT_ALSO_NEGOTIATES_506 -> VARIANT_ALSO_NEGOTIATES
  266.             INSUFFICIENT_STORAGE_507 -> INSUFFICIENT_STORAGE
  267.             NOT_EXTENDED_510 -> NOT_EXTENDED
  268.             NETWORK_AUTHENTICATION_REQUIRED_511 -> NETWORK_AUTHENTICATION_REQUIRED

  269.             else -> HttpResponseStatus(status.code, status.toString())
  270.         }
  271. }