RESTinio
Loading...
Searching...
No Matches
connection.hpp
Go to the documentation of this file.
1/*
2 restinio
3*/
4
5/*!
6 HTTP-connection routine.
7*/
8
9#pragma once
11#include <restinio/asio_include.hpp>
12
13#include <llhttp.h>
14
15#include <restinio/impl/include_fmtlib.hpp>
16
17#include <restinio/exception.hpp>
18#include <restinio/http_headers.hpp>
19#include <restinio/request_handler.hpp>
20#include <restinio/connection_count_limiter.hpp>
21#include <restinio/impl/connection_base.hpp>
22#include <restinio/impl/header_helpers.hpp>
23#include <restinio/impl/response_coordinator.hpp>
24#include <restinio/impl/connection_settings.hpp>
25#include <restinio/impl/fixed_buffer.hpp>
26#include <restinio/impl/write_group_output_ctx.hpp>
27#include <restinio/impl/executor_wrapper.hpp>
28#include <restinio/impl/sendfile_operation.hpp>
29
30#include <restinio/utils/impl/safe_uint_truncate.hpp>
31#include <restinio/utils/at_scope_exit.hpp>
32
33namespace restinio
34{
35
36namespace impl
37{
38
39//
40// http_parser_ctx_t
41//
42
43//! Parsing result context for using in parser callbacks.
44/*!
45 All data is used as temps, and is usable only
46 after parsing completes new requests then it is moved out.
47*/
49{
50 //! Request data.
51 //! \{
52 http_request_header_t m_header;
53 std::string m_body;
54 //! \}
55
56 //! Parser context temp values and flags.
57 //! \{
59 std::size_t m_last_value_total_size{ 0u };
60
61 /*!
62 * @since v.0.6.9
63 */
65
66 /*!
67 * @since v.0.6.9
68 */
70
71 /*!
72 * @brief Chunk extnsion's params if any.
73 *
74 * @since v.0.7.0
75 */
77
78 /*!
79 * @brief How many bytes were parsed for current request.
80 *
81 * @since v.0.7.0
82 */
83 std::size_t m_bytes_parsed;
84
85 //! \}
86
87 //! Flag: is http message parsed completely.
88 bool m_message_complete{ false };
89
90 /*!
91 * @brief Total number of parsed HTTP-fields.
92 *
93 * This number includes the number of leading HTTP-fields and the number
94 * of trailing HTTP-fields (in the case of chunked encoding).
95 *
96 * @since v.0.6.12
97 */
98 std::size_t m_total_field_count{ 0u };
99
100 /*!
101 * @brief Limits for the incoming message.
102 *
103 * @since v.0.6.12
104 */
106
107 /*!
108 * @brief The main constructor.
110 * @since v.0.6.12
111 */
114 : m_limits{ limits }
115 {}
116
117 //! Prepare context to handle new request.
118 void
120 {
121 m_header = http_request_header_t{};
122 m_body.clear();
123 m_current_field_name.clear();
126 m_bytes_parsed = 0;
127 m_message_complete = false;
129 }
130
131 //! Creates an instance of chunked_input_info if there is an info
132 //! about chunks in the body.
133 /*!
134 * @since v.0.6.9
135 */
136 [[nodiscard]]
139 {
140 chunked_input_info_unique_ptr_t result;
141
142 if( !m_chunked_info_block.m_chunks.empty() ||
143 0u != m_chunked_info_block.m_trailing_fields.fields_count() )
144 {
145 result = std::make_unique< chunked_input_info_t >(
146 std::move( m_chunked_info_block ) );
147 }
148
149 return result;
150 }
151};
152
153//! Include parser callbacks.
155
156//
157// create_parser_settings()
158//
159
160//! Helper for setting parser settings.
161/*!
162 Is used to initialize const value in connection_settings_t ctor.
163*/
164template< typename Http_Methods >
165inline llhttp_settings_t
167{
168 llhttp_settings_t parser_settings;
169 llhttp_settings_init( &parser_settings );
170
171 parser_settings.on_url =
172 []( llhttp_t * parser, const char * at, size_t length ) -> int {
173 return restinio_url_cb( parser, at, length );
174 };
175
176 parser_settings.on_header_field =
177 []( llhttp_t * parser, const char * at, size_t length ) -> int {
178 return restinio_header_field_cb( parser, at, length );
179 };
180
181 parser_settings.on_header_field_complete =
182 []( llhttp_t * parser ) -> int {
183 return restinio_header_field_complete_cb( parser );
184 };
185
186 parser_settings.on_header_value =
187 []( llhttp_t * parser, const char * at, size_t length ) -> int {
188 return restinio_header_value_cb( parser, at, length );
189 };
190
191 parser_settings.on_header_value_complete =
192 []( llhttp_t * parser ) -> int {
193 return restinio_header_value_complete_cb( parser );
194 };
195
196 parser_settings.on_headers_complete =
197 []( llhttp_t * parser ) -> int {
198 return restinio_headers_complete_cb( parser );
199 };
200
201 parser_settings.on_body =
202 []( llhttp_t * parser, const char * at, size_t length ) -> int {
203 return restinio_body_cb( parser, at, length );
204 };
205
206 parser_settings.on_chunk_header =
207 []( llhttp_t * parser ) -> int {
208 return restinio_chunk_header_cb( parser );
209 };
210
211 parser_settings.on_chunk_complete =
212 []( llhttp_t * parser ) -> int {
213 return restinio_chunk_complete_cb( parser );
214 };
215
216 parser_settings.on_message_complete =
217 []( llhttp_t * parser ) -> int {
218 return restinio_message_complete_cb< Http_Methods >( parser );
219 };
220
221 parser_settings.on_chunk_extension_name =
222 []( llhttp_t * parser, const char * at, size_t length ) -> int {
223 return restinio_chunk_extension_name_cb( parser, at, length );
224 };
225
226 parser_settings.on_chunk_extension_value =
227 []( llhttp_t * parser, const char * at, size_t length ) -> int {
228 return restinio_chunk_extension_value_cb( parser, at, length );
229 };
230
231 parser_settings.on_chunk_extension_name_complete =
232 []( llhttp_t * parser ) -> int {
233 return restinio_chunk_extension_name_complete_cb( parser );
234 };
236 parser_settings.on_chunk_extension_value_complete =
237 []( llhttp_t * parser ) -> int {
238 return restinio_chunk_extension_value_complete_cb( parser );
239 };
240
241 return parser_settings;
242}
244//
245// connection_upgrade_stage_t
246//
247
248//! Enum for a flag specifying that connection is going to upgrade or not.
250{
251 //! No connection request in progress
253 //! Request with connection-upgrade header came and waits for
254 //! request handler to be called in non pipelined fashion
255 //! (it must be the only request that is handled at the moment).
257 //! Handler for request with connection-upgrade header was called
258 //! so any response data comming is for that request.
259 //! If connection transforms to websocket connection
260 //! then no further operations are expected.
262};
263
264//
265// connection_input_t
266//
267
268//! Data associated with connection read routine.
270{
272 std::size_t buffer_size,
274 const llhttp_settings_t* settings )
276 , m_buf{ buffer_size }
277 {
278 llhttp_init( &m_parser, llhttp_type_t::HTTP_REQUEST, settings );
279 m_parser.data = &m_parser_ctx;
280 }
281
282 //! HTTP-parser.
283 //! \{
284 llhttp_t m_parser;
286 //! \}
287
288 //! Input buffer.
290
291 //! Connection upgrade request stage.
294
295 //! Flag to track whether read operation is performed now.
297
298 //! Prepare parser for reading new http-message.
299 void
301 {
302 // Reinit parser.
303 llhttp_reset( &m_parser);
304
305 // Reset context and attach it to parser.
306 m_parser_ctx.reset();
307 }
308};
309
310template < typename Connection, typename Start_Read_CB, typename Failed_CB >
311void
313 asio_ns::ip::tcp::socket & ,
314 Connection & ,
315 Start_Read_CB start_read_cb,
316 Failed_CB )
317{
318 // No preparation is needed, start
319 start_read_cb();
320}
321
322// An overload for the case of non-TLS-connection.
323inline tls_socket_t *
325 asio_ns::ip::tcp::socket & ) noexcept
326{
327 return nullptr;
328}
329
330//
331// connection_t
332//
333
334//! Context for handling http connections.
335/*
336 Working circle consists of the following steps:
337 * wait for request -- reading from socket and parsing header and body;
338 * handling request -- once the request is completely obtained it's handling
339 is deligated to a handler chosen by handler factory;
340 * writing response -- writing response to socket;
341 * back to first step o close connection -- depending on keep-alive property
342 of the last response the connection goes back to first step or
343 shutdowns.
344
345 Each step is controlled by timer (\see schedule_operation_timeout_callback())
346
347 In case of errors connection closes itself.
348*/
349template < typename Traits >
350class connection_t final
351 : public connection_base_t
352 , public executor_wrapper_t< typename Traits::strand_t >
353{
355
356 public:
357 using timer_manager_t = typename Traits::timer_manager_t;
361 using logger_t = typename Traits::logger_t;
362 using strand_t = typename Traits::strand_t;
363 using stream_socket_t = typename Traits::stream_socket_t;
366
368 //! Connection id.
369 connection_id_t conn_id,
370 //! Connection socket.
371 stream_socket_t && socket,
372 //! Settings that are common for connections.
373 connection_settings_handle_t< Traits > settings,
374 //! Remote endpoint for that connection.
375 endpoint_t remote_endpoint,
376 //! Lifetime monitor to be used for handling connection count.
377 lifetime_monitor_t lifetime_monitor )
378 : connection_base_t{ conn_id }
379 , executor_wrapper_base_t{ socket.get_executor() }
380 , m_socket{ std::move( socket ) }
381 , m_settings{ std::move( settings ) }
383 , m_input{
387 }
391 , m_logger{ *( m_settings->m_logger ) }
392 , m_lifetime_monitor{ std::move(lifetime_monitor) }
393 {
394 // Notify of a new connection instance.
395 m_logger.trace( [&]{
396 return fmt::format(
398 "[connection:{}] start connection with {}" ),
399 connection_id(),
400 fmtlib_tools::streamed( m_remote_endpoint ) );
401 } );
402 }
403
404 // Disable copy/move.
405 connection_t( const connection_t & ) = delete;
407 connection_t & operator = ( const connection_t & ) = delete;
408 connection_t & operator = ( connection_t && ) = delete;
409
410 ~connection_t() override
411 {
412 restinio::utils::log_trace_noexcept( m_logger,
413 [&]{
414 return fmt::format(
416 "[connection:{}] destructor called" ),
418 } );
419 }
420
421 void
423 {
424 prepare_connection_and_start_read(
425 m_socket,
426 *this,
427 [ & ]{
428 // Inform state listener if it used.
429 m_settings->call_state_listener( [this]() noexcept {
430 return connection_state::notice_t{
431 this->connection_id(),
432 this->m_remote_endpoint,
433 connection_state::accepted_t{
434 make_tls_socket_pointer_for_state_listener(
435 m_socket )
436 }
437 };
438 } );
439
440 // Start timeout checking.
441 m_prepared_weak_ctx = shared_from_this();
443
444 // Start reading request.
446 },
447 [ & ]( const asio_ns::error_code & ec ){
448 trigger_error_and_close( [&]{
449 return fmt::format(
451 "[connection:{}] prepare connection error: {}" ),
453 ec.message() );
454 } );
455 } );
456 }
457
458 //! Start reading next htttp-message.
459 void
461 {
462 m_logger.trace( [&]{
463 return fmt::format(
465 "[connection:{}] start waiting for request" ),
467 } );
468
469 // Prepare parser for consuming new request message.
470 m_input.reset_parser();
471
472 // Guard total time for a request to be read.
473 // guarding here makes the total read process
474 // to run in read_next_http_message_timelimit.
476
477 if( 0 != m_input.m_buf.length() )
478 {
479 // If a pipeline requests were sent by client
480 // then the biginning (or even entire request) of it
481 // is in the buffer obtained from socket in previous
482 // read operation.
483 consume_data( m_input.m_buf.bytes(), m_input.m_buf.length() );
484 }
485 else
486 {
487 // Next request (if any) must be obtained from socket.
489 }
490 }
491
492 //! Internals that are necessary for upgrade.
494 {
496 upgrade_internals_t && ) = default;
497
499 connection_settings_handle_t< Traits > settings,
500 stream_socket_t socket,
501 lifetime_monitor_t lifetime_monitor )
503 , m_socket{ std::move( socket ) }
504 , m_lifetime_monitor{ std::move(lifetime_monitor) }
505 {}
506
510 };
511
512 //! Move socket out of connection.
515 {
516 return upgrade_internals_t{
517 m_settings,
518 std::move(m_socket),
519 std::move(m_lifetime_monitor)
520 };
521 }
522
523 private:
524 //! Start (continue) a chain of read-parse-read-... operations.
525 inline void
527 {
528 if( !m_input.m_read_operation_is_running )
529 {
530 m_logger.trace( [&]{
531 return fmt::format(
533 "[connection:{}] continue reading request" ),
535 } );
536
537
538 m_input.m_read_operation_is_running = true;
539 m_socket.async_read_some(
540 m_input.m_buf.make_asio_buffer(),
541 asio_ns::bind_executor(
542 this->get_executor(),
543 [this, ctx = shared_from_this()]
544 // NOTE: this lambda is noexcept since v.0.6.0.
545 ( const asio_ns::error_code & ec,
546 std::size_t length ) noexcept {
547 m_input.m_read_operation_is_running = false;
548 RESTINIO_ENSURE_NOEXCEPT_CALL( after_read( ec, length ) );
549 } ) );
550 }
551 else
552 {
553 m_logger.trace( [&]{
554 return fmt::format(
556 "[connection:{}] skip read operation: already running" ),
558 } );
559 }
560 }
561
562 //! Handle read operation result.
563 inline void
564 after_read( const asio_ns::error_code & ec, std::size_t length ) noexcept
565 {
566 if( !ec )
567 {
568 // Exceptions shouldn't go out of `after_read`.
569 // So intercept them and close the connection in the case
570 // of an exception.
571 try
572 {
573 m_logger.trace( [&]{
574 return fmt::format(
576 "[connection:{}] received {} bytes" ),
577 this->connection_id(),
578 length );
579 } );
580
581 m_input.m_buf.obtained_bytes( length );
582
583 consume_data( m_input.m_buf.bytes(), length );
584 }
585 catch( const std::exception & x )
586 {
587 trigger_error_and_close( [&] {
588 return fmt::format(
590 "[connection:{}] unexpected exception during the "
591 "handling of incoming data: {}" ),
593 x.what() );
594 } );
595 }
596 }
597 else
598 {
599 // Well, if it is actually an error
600 // then close connection.
602 {
603 if ( !error_is_eof( ec ) || 0 != m_input.m_parser_ctx.m_bytes_parsed )
604 trigger_error_and_close( [&]{
605 return fmt::format(
607 "[connection:{}] read socket error: {}; "
608 "parsed bytes: {}" ),
609 connection_id(),
610 ec.message(),
611 m_input.m_parser_ctx.m_bytes_parsed );
612 } );
613 else
614 {
615 // A case that is not such an error:
616 // on a connection (most probably keeped alive
617 // after previous request, but a new also applied)
618 // no bytes were consumed and remote peer closes connection.
619 restinio::utils::log_trace_noexcept( m_logger,
620 [&]{
621 return fmt::format(
623 "[connection:{}] EOF and no request, "
624 "close connection" ),
626 } );
627
629 }
630 }
631 // else: read operation was cancelled.
632 }
633 }
634
635 //! Parse some data.
636 void
637 consume_data( const char * data, std::size_t length )
638 {
639 auto * parser = &m_input.m_parser;
640
641 const auto parse_err =
642 llhttp_execute( parser, data, length );
643
644 const auto nparsed = [&]{
645 if( !parser->error_pos )
646 return length;
647 return static_cast< std::size_t >( parser->error_pos - data );
648 }();
649
650 if( nparsed > length )
651 {
652 // Parser is in the unreliable state,
653 // so we done with this connection.
654 trigger_error_and_close( [&]{
655 return fmt::format(
657 "[connection:{}] unexpected parser behavior: "
658 "llhttp_execute() reports parsed bytes number ({}) "
659 "is greater than the size of a buffer ({})"
660 "that was fed to the parser" ),
662 nparsed,
663 length );
664 } );
665 return;
666 }
667
668 m_input.m_parser_ctx.m_bytes_parsed += nparsed;
669
670 // If entire http-message was obtained,
671 // parser is stopped and the might be a part of consecutive request
672 // left in buffer, so we mark how many bytes were obtained.
673 // and next message read (if any) will be started from already existing
674 // data left in buffer.
675 m_input.m_buf.consumed_bytes( nparsed );
676
677 if( HPE_OK != parse_err &&
678 HPE_PAUSED != parse_err &&
679 HPE_PAUSED_UPGRADE != parse_err )
680 {
681 // Parser error must be the one defined by llhttp_errno.
682 // It is possible to get a random error value
683 // (e.g. providing parser-callbacks that return
684 // unconventional errors), here we put an assert
685 // to quickly highlight the problems.
686 // As all the parser-callbacks are implemented by
687 // restinio the assert bellow MUST pass.
688 assert( llhttp_errno::HPE_OK <= parse_err &&
689 llhttp_errno::HPE_CB_RESET >= parse_err );
690
691 // TODO: handle case when there are some request in process.
692 trigger_error_and_close( [&]{
693 return fmt::format(
695 "[connection:{}] parser error {}: {}" ),
697 llhttp_errno_name( parse_err ),
698 llhttp_get_error_reason( parser ) );
699 } );
700
701 // nothing to do.
702 return;
703 }
704
705 if( m_input.m_parser_ctx.m_message_complete )
706 {
708 }
709 else
711 }
712
713 //! Handle a given request message.
714 void
716 {
717 try
718 {
719 auto & parser = m_input.m_parser;
720 auto & parser_ctx = m_input.m_parser_ctx;
721
722 if( m_input.m_parser.upgrade )
723 {
724 // Start upgrade connection operation.
725
726 // The first thing is to make sure
727 // that upgrade request will be handled in
728 // a non pipelined fashion.
729 m_input.m_connection_upgrade_stage =
730 connection_upgrade_stage_t::pending_upgrade_handling;
731 }
732
733 if( connection_upgrade_stage_t::none ==
734 m_input.m_connection_upgrade_stage )
735 {
736 // Run ordinary HTTP logic.
737 const auto request_id = m_response_coordinator.register_new_request();
738
739 m_logger.trace( [&]{
740 return fmt::format(
742 "[connection:{}] request received (#{}): {} {}" ),
744 request_id,
745 llhttp_method_name(
746 static_cast<llhttp_method>( parser.method ) ),
747 parser_ctx.m_header.request_target() );
748 } );
749
750 // TODO: mb there is a way to
751 // track if response was emmited immediately in handler
752 // or it was delegated
753 // so it is possible to omit this timer scheduling.
755
756 const auto handling_result =
757 m_request_handler(
758 std::make_shared< generic_request_t >(
759 request_id,
760 std::move( parser_ctx.m_header ),
761 std::move( parser_ctx.m_body ),
762 parser_ctx.make_chunked_input_info_if_necessary(),
763 shared_from_concrete< connection_base_t >(),
764 m_remote_endpoint,
765 m_settings->extra_data_factory() ) );
766
767 switch( handling_result )
768 {
769 case request_handling_status_t::not_handled:
770 case request_handling_status_t::rejected:
771 // If handler refused request, say not implemented.
772 write_response_parts_impl(
773 request_id,
774 response_output_flags_t{
775 response_parts_attr_t::final_parts,
776 response_connection_attr_t::connection_close },
777 write_group_t{ create_not_implemented_resp() } );
778 break;
779
781 if( m_response_coordinator.is_able_to_get_more_messages() )
782 {
783 // Request was accepted,
784 // didn't create immediate response that closes connection after,
785 // and it is possible to receive more requests
786 // then start consuming yet another request.
788 }
789 break;
790 }
791 }
792 else
793 {
794 m_logger.trace( [&]{
795 const std::string default_value{};
796
797 return fmt::format(
799 "[connection:{}] upgrade request received: {} {}; "
800 "Upgrade: '{}';" ),
802 llhttp_method_name(
803 static_cast<llhttp_method>( parser.method ) ),
804 parser_ctx.m_header.request_target(),
805 parser_ctx.m_header.get_field_or(
806 http_field::upgrade, default_value ) );
807 } );
808
809 if( m_response_coordinator.empty() )
810 {
811 // There are no requests in handling
812 // So the current request with upgrade
813 // is the only one and can be handled directly.
814 // It is safe to call a handler for it.
816 }
817 else
818 {
819 // There are pipelined request
820 m_logger.trace( [&]{
821 return fmt::format(
823 "[connection:{}] upgrade request happened to "
824 "be a pipelined one, "
825 "and will be handled after previous requests "
826 "are handled" ),
828 } );
829 }
830
831 // No further actions (like continue reading) in both cases are needed.
832 }
833
834 }
835 catch( const std::exception & ex )
836 {
837 trigger_error_and_close( [&]{
838 return fmt::format(
840 "[connection:{}] error while handling request: {}" ),
841 this->connection_id(),
842 ex.what() );
843 } );
844 }
845 }
846
847 //! Calls handler for upgrade request.
848 /*!
849 Request data must be in input context (m_input).
850 */
851 void
853 {
854 auto & parser = m_input.m_parser;
855 auto & parser_ctx = m_input.m_parser_ctx;
856
857 // If user responses with error
858 // then connection must be able to send
859 // (hence to receive) response.
860
861 const auto request_id = m_response_coordinator.register_new_request();
862
863 m_logger.info( [&]{
864 return fmt::format(
866 "[connection:{}] handle upgrade request (#{}): {} {}" ),
868 request_id,
869 llhttp_method_name(
870 static_cast<llhttp_method>( parser.method ) ),
871 parser_ctx.m_header.request_target() );
872 } );
873
874 // Do not guard upgrade request.
876
877 // After calling handler we expect the results or
878 // no further operations with connection
879 m_input.m_connection_upgrade_stage =
880 connection_upgrade_stage_t::wait_for_upgrade_handling_result_or_nothing;
881
882 const auto handling_result = m_request_handler(
883 std::make_shared< generic_request_t >(
884 request_id,
885 std::move( parser_ctx.m_header ),
886 std::move( parser_ctx.m_body ),
887 parser_ctx.make_chunked_input_info_if_necessary(),
888 shared_from_concrete< connection_base_t >(),
889 m_remote_endpoint,
890 m_settings->extra_data_factory() ) );
891 switch( handling_result )
892 {
895 if( m_socket.is_open() )
896 {
897 // Request is rejected, so our socket
898 // must not be moved out to websocket connection.
899
900 // If handler refused request, say not implemented.
901 write_response_parts_impl(
902 request_id,
903 response_output_flags_t{
904 response_parts_attr_t::final_parts,
905 response_connection_attr_t::connection_close },
906 write_group_t{ create_not_implemented_resp() } );
907 }
908 else
909 {
910 // Request is rejected, but the socket
911 // was moved out to somewhere else???
912
913 m_logger.error( [&]{
914 return fmt::format(
916 "[connection:{}] upgrade request handler rejects "
917 "request, but socket was moved out from connection" ),
919 } );
920 }
921 break;
922
924 /* nothing to do */
925 break;
926 }
927
928 // Else 2 cases:
929 // 1. request is handled asynchronously, so
930 // what happens next depends on handling.
931 // 2. handling was immediate, so further destiny
932 // of a connection was already determined.
933 //
934 // In both cases: here do nothing.
935 // We can't even do read-only access because
936 // upgrade handling might take place
937 // in distinct execution context.
938 // So no even a log messages here.
939 }
940
941 //! Write parts for specified request.
942 virtual void
944 //! Request id.
945 request_id_t request_id,
946 //! Resp output flag.
947 response_output_flags_t response_output_flags,
948 //! Part of the response data.
949 write_group_t wg ) override
950 {
951 //! Run write message on io_context loop if possible.
952 asio_ns::dispatch(
953 this->get_executor(),
954 [ this,
955 request_id,
956 response_output_flags,
957 actual_wg = std::move( wg ),
958 ctx = shared_from_this() ]
959 // NOTE that this lambda is noexcept since v.0.6.0.
960 () mutable noexcept
961 {
962 try
963 {
964 write_response_parts_impl(
965 request_id,
966 response_output_flags,
967 std::move( actual_wg ) );
968 }
969 catch( const std::exception & ex )
970 {
971 trigger_error_and_close( [&]{
972 return fmt::format(
974 "[connection:{}] unable to handle response: {}" ),
976 ex.what() );
977 } );
978 }
979 } );
980 }
981
982 //! Write parts for specified request.
983 void
985 //! Request id.
986 request_id_t request_id,
987 //! Resp output flag.
988 response_output_flags_t response_output_flags,
989 //! Part of the response data.
990 write_group_t wg )
991 {
992 auto invoke_after_write_cb_with_error = [&]{
993 try
994 {
998 }
999 catch( const std::exception & ex )
1000 {
1001 m_logger.error( [&]{
1002 return fmt::format(
1004 "[connection:{}] notificator error: {}" ),
1006 ex.what() );
1007 } );
1008 }
1009 };
1010
1011 if( m_socket.is_open() )
1012 {
1013 if( connection_upgrade_stage_t::
1014 wait_for_upgrade_handling_result_or_nothing ==
1015 m_input.m_connection_upgrade_stage )
1016 {
1017 // It is response for a connection-upgrade request.
1018 // If we receive it here then it is constructed via
1019 // message builder and so connection was not transformed
1020 // to websocket connection.
1021 // So it is necessary to resume pipeline logic that was stopped
1022 // for upgrade-request to be handled as the only request
1023 // on the connection for that moment.
1024 if( !m_response_coordinator.is_full() )
1025 {
1027 }
1028 }
1029
1030 if( !m_response_coordinator.closed() )
1031 {
1032 m_logger.trace( [&]{
1033 return fmt::format(
1035 "[connection:{}] append response (#{}), "
1036 "flags: {}, write group size: {}" ),
1038 request_id,
1039 fmtlib_tools::streamed( response_output_flags ),
1040 wg.items_count() );
1041 } );
1042
1043 m_response_coordinator.append_response(
1044 request_id,
1045 response_output_flags,
1046 std::move( wg ) );
1047
1049 }
1050 else
1051 {
1052 m_logger.warn( [&]{
1053 return fmt::format(
1055 "[connection:{}] receive response parts for "
1056 "request (#{}), but response with connection-close "
1057 "attribute happened before" ),
1059 request_id );
1060 } );
1061 invoke_after_write_cb_with_error();
1062 }
1063 }
1064 else
1065 {
1066 m_logger.warn( [&]{
1067 return fmt::format(
1069 "[connection:{}] try to write response, "
1070 "while socket is closed" ),
1072 } );
1073 invoke_after_write_cb_with_error();
1074 }
1075 }
1076
1077 // Check if there is something to write,
1078 // and if so starts write operation.
1079 void
1081 {
1082 assert( !m_response_coordinator.closed() );
1083
1084 if( !m_write_output_ctx.transmitting() )
1085 {
1087 }
1088 }
1089
1090 //! Initiate write operation.
1091 void
1093 {
1094 // Here: not writing anything to socket, so
1095 // write operation can be initiated.
1096
1097 // Remember if all response cells were busy.
1098 const bool response_coordinator_full_before =
1099 m_response_coordinator.is_full();
1100
1101 auto next_write_group = m_response_coordinator.pop_ready_buffers();
1102
1103 if( next_write_group )
1104 {
1105 m_logger.trace( [&]{
1106 return fmt::format(
1108 "[connection:{}] start next write group for response (#{}), "
1109 "size: {}" ),
1110 this->connection_id(),
1111 next_write_group->second,
1112 next_write_group->first.items_count() );
1113 } );
1114
1115 // Check if all response cells busy:
1116 const bool response_coordinator_full_after =
1117 m_response_coordinator.is_full();
1118
1119 // Whether we need to resume read after this group is written?
1121 response_coordinator_full_before &&
1122 !response_coordinator_full_after;
1123
1124 if( 0 < next_write_group->first.status_line_size() )
1125 {
1126 // We need to extract status line out of the first buffer
1127 assert(
1129 next_write_group->first.items().front().write_type() );
1130
1131 m_logger.trace( [&]{
1132 // Get status line:
1133 const string_view_t status_line{
1134 asio_ns::buffer_cast< const char * >(
1135 next_write_group->first.items().front().buf() ),
1136 next_write_group->first.status_line_size() };
1137
1138 return fmt::format(
1140 "[connection:{}] start response (#{}): {}" ),
1141 this->connection_id(),
1142 next_write_group->second,
1143 fmtlib_tools::streamed( status_line ) );
1144 } );
1145 }
1146
1147 // Initialize write context with a new write group.
1148 m_write_output_ctx.start_next_write_group(
1149 std::move( next_write_group->first ) );
1150
1151 // Start the loop of sending data from current write group.
1153 }
1154 else
1155 {
1157 }
1158 }
1159
1160 // Use aliases for shorter names.
1164
1165 //! Start/continue/continue handling output data of current write group.
1166 /*!
1167 This function is a starting point of a loop process of sending data
1168 from a given write group.
1169 It extracts the next bunch of trivial buffers or a
1170 sendfile-runner and starts an appropriate write operation.
1171 In data of a given write group finishes,
1172 finish_handling_current_write_ctx() is invoked thus breaking the loop.
1173
1174 @note
1175 Since v.0.6.0 this method is noexcept.
1176 */
1177 void
1179 {
1180 try
1181 {
1182 auto wo = m_write_output_ctx.extract_next_write_operation();
1183
1184 if( std::holds_alternative< trivial_write_operation_t >( wo ) )
1185 {
1187 }
1188 else if( std::holds_alternative< file_write_operation_t >( wo ) )
1189 {
1191 }
1192 else
1193 {
1194 assert( std::holds_alternative< none_write_operation_t >( wo ) );
1196 }
1197 }
1198 catch( const std::exception & ex )
1199 {
1200 trigger_error_and_close( [&]{
1201 return fmt::format(
1203 "[connection:{}] handle_current_write_ctx failed: {}" ),
1205 ex.what() );
1206 } );
1207 }
1208 }
1209
1210 //! Run trivial buffers write operation.
1211 void
1213 {
1214 // Asio buffers (param for async write):
1215 auto & bufs = op.get_trivial_bufs();
1216
1217 if( m_response_coordinator.closed() )
1218 {
1219 m_logger.trace( [&]{
1220 return fmt::format(
1222 "[connection:{}] sending resp data with "
1223 "connection-close attribute "
1224 "buf count: {}, "
1225 "total size: {}" ),
1227 bufs.size(),
1228 op.size() );
1229 } );
1230
1231 // Reading new requests is useless.
1232 asio_ns::error_code ignored_ec;
1233 m_socket.cancel( ignored_ec );
1234 }
1235 else
1236 {
1237 m_logger.trace( [&]{
1238 return fmt::format(
1240 "[connection:{}] sending resp data, "
1241 "buf count: {}, "
1242 "total size: {}" ),
1244 bufs.size(),
1245 op.size() ); } );
1246 }
1247
1248 // There is somethig to write.
1249 asio_ns::async_write(
1250 m_socket,
1251 bufs,
1252 asio_ns::bind_executor(
1253 this->get_executor(),
1254 [this, ctx = shared_from_this()]
1255 // NOTE: since v.0.6.0 this lambda is noexcept.
1256 ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1257 {
1258 if( !ec )
1259 {
1260 restinio::utils::log_trace_noexcept( m_logger,
1261 [&]{
1262 return fmt::format(
1264 "[connection:{}] outgoing data was "
1265 "sent: {} bytes" ),
1267 written );
1268 } );
1269 }
1270
1272 } ) );
1273
1275 }
1276
1277 //! Run sendfile write operation.
1278 void
1280 {
1281 if( m_response_coordinator.closed() )
1282 {
1283 m_logger.trace( [&]{
1284 return fmt::format(
1286 "[connection:{}] sending resp file data with "
1287 "connection-close attribute, "
1288 "total size: {}" ),
1290 op.size() );
1291 } );
1292
1293 // Reading new requests is useless.
1294 asio_ns::error_code ignored_ec;
1295 m_socket.cancel( ignored_ec );
1296 }
1297 else
1298 {
1299 m_logger.trace( [&]{
1300 return fmt::format(
1302 "[connection:{}] sending resp file data, total size: {}" ),
1304 op.size() );
1305 } );
1306 }
1307
1309
1310 auto op_ctx = op;
1311
1312 op_ctx.start_sendfile_operation(
1313 this->get_executor(),
1314 m_socket,
1315 asio_ns::bind_executor(
1316 this->get_executor(),
1317 [this, ctx = shared_from_this(),
1318 // Store operation context till the end
1319 op_ctx ]
1320 // NOTE: since v.0.6.0 this lambda is noexcept
1321 (const asio_ns::error_code & ec, file_size_t written ) mutable noexcept
1322 {
1323 // NOTE: op_ctx should be reset just before return from
1324 // that lambda. We can't call reset() until the end of
1325 // the lambda because lambda object itself will be
1326 // destroyed.
1327 auto op_ctx_reseter = restinio::utils::at_scope_exit(
1328 [&op_ctx] {
1329 // Reset sendfile operation context.
1331 } );
1332
1333 if( !ec )
1334 {
1335 restinio::utils::log_trace_noexcept( m_logger,
1336 [&]{
1337 return fmt::format(
1339 "[connection:{}] file data was sent: "
1340 "{} bytes" ),
1342 written );
1343 } );
1344 }
1345 else
1346 {
1347 restinio::utils::log_error_noexcept( m_logger,
1348 [&]{
1349 return fmt::format(
1351 "[connection:{}] send file data error: "
1352 "{} ({}) bytes" ),
1354 ec.value(),
1355 ec.message() );
1356 } );
1357 }
1358
1360 } ) );
1361 }
1362
1363 //! Do post write actions for current write group.
1364 void
1366 {
1367 // Finishing writing this group.
1368 m_logger.trace( [&]{
1369 return fmt::format(
1371 "[connection:{}] finishing current write group" ),
1372 this->connection_id() );
1373 } );
1374
1375 // Group notificators are called from here (if exist):
1376 m_write_output_ctx.finish_write_group();
1377
1378 if( !m_response_coordinator.closed() )
1379 {
1380 m_logger.trace( [&]{
1381 return fmt::format(
1383 "[connection:{}] should keep alive" ),
1384 this->connection_id() );
1385 } );
1386
1387 if( connection_upgrade_stage_t::none ==
1388 m_input.m_connection_upgrade_stage )
1389 {
1390 // Run ordinary HTTP logic.
1392 {
1394 }
1395
1396 // Start another write opertion
1397 // if there is something to send.
1399 }
1400 else
1401 {
1402 if( m_response_coordinator.empty() )
1403 {
1404 // Here upgrade req is the only request
1405 // to be handled by this connection.
1406 // So it is safe to call a handler for it.
1408 }
1409 else
1410 {
1411 // Do not start reading in any case,
1412 // but if there is at least one request preceding
1413 // upgrade-req, logic must continue http interaction.
1415 }
1416 }
1417 }
1418 else
1419 {
1420 // No keep-alive, close connection.
1421 close();
1422 }
1423 }
1424
1425 void
1427 {
1428 if( m_response_coordinator.closed() )
1429 {
1430 // Bufs empty but there happened to
1431 // be a response context marked as complete
1432 // (final_parts) and having connection-close attr.
1433 // It is because `init_write_if_necessary()`
1434 // is called only under `!m_response_coordinator.closed()`
1435 // condition, so if no bufs were obtained
1436 // and response coordinator is closed means
1437 // that a first response stored by
1438 // response coordinator was marked as complete
1439 // without data.
1440
1441 m_logger.trace( [&]{
1442 return fmt::format(
1444 "[connection:{}] last sent response was marked "
1445 "as complete" ),
1446 connection_id() ); } );
1447 close();
1448 }
1449 else
1450 {
1451 // Not writing anything, so need to deal with timouts.
1452 if( m_response_coordinator.empty() )
1453 {
1454 // No requests in processing.
1455 // So set read next request timeout.
1457 }
1458 else
1459 {
1460 // Have requests in process.
1461 // So take control over request handling.
1463 }
1464 }
1465 }
1466
1467 //! Handle write response finished.
1468 /*!
1469 * @note
1470 * Since v.0.6.0 this method is noexcept.
1471 */
1472 void
1473 after_write( const asio_ns::error_code & ec ) noexcept
1474 {
1475 if( !ec )
1476 {
1478 }
1479 else
1480 {
1482 {
1483 trigger_error_and_close( [&]{
1484 return fmt::format(
1486 "[connection:{}] unable to write: {}" ),
1488 ec.message() );
1489 } );
1490 }
1491 // else: Operation aborted only in case of close was called.
1492
1493 try
1494 {
1495 m_write_output_ctx.fail_write_group( ec );
1496 }
1497 catch( const std::exception & ex )
1498 {
1499 restinio::utils::log_error_noexcept( m_logger,
1500 [&]{
1501 return fmt::format(
1503 "[connection:{}] notificator error: {}" ),
1505 ex.what() );
1506 } );
1507 }
1508 }
1509 }
1510
1511 //! Close connection functions.
1512 //! \{
1513
1514 //! Standard close routine.
1515 void
1516 close() noexcept
1517 {
1518 restinio::utils::log_trace_noexcept( m_logger,
1519 [&]{
1520 return fmt::format(
1521 RESTINIO_FMT_FORMAT_STRING( "[connection:{}] close" ),
1523 } );
1524
1525 // shutdown() and close() should be called regardless of
1526 // possible exceptions.
1527 restinio::utils::suppress_exceptions(
1528 m_logger,
1529 "connection.socket.shutdown",
1530 [this] {
1531 asio_ns::error_code ignored_ec;
1532 m_socket.shutdown(
1533 asio_ns::ip::tcp::socket::shutdown_both,
1534 ignored_ec );
1535 } );
1536 restinio::utils::suppress_exceptions(
1537 m_logger,
1538 "connection.socket.close",
1539 [this] {
1540 m_socket.close();
1541 } );
1542
1543 restinio::utils::log_trace_noexcept( m_logger,
1544 [&]{
1545 return fmt::format(
1547 "[connection:{}] close: close socket" ),
1549 } );
1550
1551 // Clear stuff.
1553
1554 restinio::utils::log_trace_noexcept( m_logger,
1555 [&]{
1556 return fmt::format(
1558 "[connection:{}] close: timer canceled" ),
1560 } );
1561
1562 RESTINIO_ENSURE_NOEXCEPT_CALL( m_response_coordinator.reset() );
1563
1564 restinio::utils::log_trace_noexcept( m_logger,
1565 [&]{
1566 return fmt::format(
1568 "[connection:{}] close: reset responses data" ),
1570 } );
1571
1572 // Inform state listener if it used.
1573 m_settings->call_state_listener_suppressing_exceptions(
1574 [this]() noexcept {
1575 return connection_state::notice_t{
1576 this->connection_id(),
1577 this->m_remote_endpoint,
1578 connection_state::closed_t{}
1579 };
1580 } );
1581 }
1582
1583 //! Trigger an error.
1584 /*!
1585 Closes the connection and write to log
1586 an error message.
1587 */
1588 template< typename Message_Builder >
1589 void
1590 trigger_error_and_close( Message_Builder msg_builder ) noexcept
1591 {
1592 // An exception from logger/msg_builder shouldn't prevent
1593 // a call to close().
1594 restinio::utils::log_error_noexcept(
1595 m_logger, std::move(msg_builder) );
1596
1598 }
1599 //! \}
1600
1601 //! Connection.
1603
1604 //! Common paramaters of a connection.
1606
1607 //! Remote endpoint for this connection.
1609
1610 //! Input routine.
1612
1613 //! Write to socket operation context.
1615
1616 // Memo flag: whether we need to resume read after this group is written
1618
1619 //! Response coordinator.
1621
1622 //! Timer to controll operations.
1623 //! \{
1624
1625 //! Check timeouts for all activities.
1626 static connection_t &
1628 {
1629 return static_cast< connection_t & >( base );
1630 }
1631
1632 //! Schedules real timedout operations check on
1633 //! the executer of a connection.
1634 virtual void
1636 {
1637 asio_ns::dispatch(
1638 this->get_executor(),
1639 [ ctx = std::move( self ) ]
1640 // NOTE: this lambda is noexcept since v.0.6.0.
1641 () noexcept {
1642 auto & conn_object = cast_to_self( *ctx );
1643 // If an exception will be thrown we can only
1644 // close the connection.
1645 try
1646 {
1647 conn_object.check_timeout_impl();
1648 }
1649 catch( const std::exception & x )
1650 {
1651 conn_object.trigger_error_and_close( [&] {
1652 return fmt::format(
1654 "[connection: {}] unexpected "
1655 "error during timeout handling: {}" ),
1656 conn_object.connection_id(),
1657 x.what() );
1658 } );
1659 }
1660 } );
1661 }
1662
1663 //! Callback type for timedout operations.
1664 using timout_cb_t = void (connection_t::* )( void );
1665
1666 //! Callback to all if timeout happened.
1668
1669 //! Timeout point of a current guarded operation.
1670 std::chrono::steady_clock::time_point m_current_timeout_after;
1671 //! Timer guard.
1673 //! A prepared weak handle for passing it to timer guard.
1675
1676 //! Check timed out operation.
1677 void
1679 {
1680 if( std::chrono::steady_clock::now() > m_current_timeout_after )
1681 {
1683 (this->*m_current_timeout_cb)();
1684 }
1685 else
1686 {
1688 }
1689 }
1690
1691 //! Schedule next timeout checking.
1692 void
1694 {
1695 m_timer_guard.schedule( m_prepared_weak_ctx );
1696 }
1697
1698 //! Stop timout guarding.
1699 void
1701 {
1702 m_current_timeout_cb = nullptr;
1704 }
1705
1706 //! Helper function to work with timer guard.
1707 void
1709 std::chrono::steady_clock::time_point timeout_after,
1710 timout_cb_t timout_cb )
1711 {
1712 m_current_timeout_after = timeout_after;
1713 m_current_timeout_cb = timout_cb;
1714 }
1715
1716 void
1718 std::chrono::steady_clock::duration timeout,
1719 timout_cb_t timout_cb )
1720 {
1721 schedule_operation_timeout_callback(
1722 std::chrono::steady_clock::now() + timeout,
1723 timout_cb );
1724 }
1725
1726 void
1727 handle_xxx_timeout( const char * operation_name )
1728 {
1729 m_logger.trace( [&]{
1730 return fmt::format(
1731 RESTINIO_FMT_FORMAT_STRING( "[connection:{}] {} timed out" ),
1733 operation_name );
1734 } );
1735
1736 close();
1737 }
1738
1739 void
1741 {
1742 handle_xxx_timeout( "wait for request" );
1743 }
1744
1745 //! Statr guard read operation if necessary.
1746 void
1748 {
1749 if( m_response_coordinator.empty() )
1750 {
1751 schedule_operation_timeout_callback(
1752 m_settings->m_read_next_http_message_timelimit,
1753 &connection_t::handle_read_timeout );
1754 }
1755 }
1756
1757 void
1759 {
1760 handle_xxx_timeout( "handle request" );
1761 }
1762
1763 //! Start guard request handling operation if necessary.
1764 void
1766 {
1767 if( !m_write_output_ctx.transmitting() )
1768 {
1769 schedule_operation_timeout_callback(
1770 m_settings->m_handle_request_timeout,
1771 &connection_t::handle_request_handling_timeout );
1772 }
1773 }
1774
1775 void
1777 {
1778 handle_xxx_timeout( "writing response" );
1779 }
1780
1781 //! Start guard write operation if necessary.
1782 void
1784 {
1785 schedule_operation_timeout_callback(
1786 m_settings->m_write_http_response_timelimit,
1787 &connection_t::handle_write_response_timeout );
1788 }
1789
1790 void
1792 {
1793 handle_xxx_timeout( "writing response (sendfile)" );
1794 }
1795
1796 void
1797 guard_sendfile_operation( std::chrono::steady_clock::duration timelimit )
1798 {
1799 if( std::chrono::steady_clock::duration::zero() == timelimit )
1800 timelimit = m_settings->m_write_http_response_timelimit;
1801
1802 schedule_operation_timeout_callback(
1803 timelimit,
1805 }
1806 //! \}
1807
1808 //! Request handler.
1810
1811 //! Logger for operation
1813
1814 /*!
1815 * @brief Monitor of the connection lifetime.
1816 *
1817 * It's required for controlling the count of active parallel
1818 * connections.
1819 *
1820 * @since v.0.6.12
1821 */
1823};
1824
1825//
1826// connection_factory_t
1827//
1828
1829//! Factory for connections.
1830template < typename Traits >
1832{
1833 public:
1834 using logger_t = typename Traits::logger_t;
1835 using stream_socket_t = typename Traits::stream_socket_t;
1838
1840 connection_settings_handle_t< Traits > connection_settings,
1841 std::unique_ptr< socket_options_setter_t > socket_options_setter )
1845 {}
1846
1847 // NOTE: since v.0.6.3 it returns non-empty
1848 // shared_ptr<connection_t<Traits>> or an exception is thrown in
1849 // the case of an error.
1850 // NOTE: since v.0.6.12 it accepts yet another parameter: lifetime_monitor.
1851 auto
1853 stream_socket_t socket,
1854 endpoint_t remote_endpoint,
1855 lifetime_monitor_t lifetime_monitor )
1856 {
1857 using connection_type_t = connection_t< Traits >;
1858
1859 {
1860 socket_options_t options{ socket.lowest_layer() };
1861 (*m_socket_options_setter)( options );
1862 }
1863
1864 return std::make_shared< connection_type_t >(
1865 m_connection_id_counter++,
1866 std::move( socket ),
1867 m_connection_settings,
1868 std::move( remote_endpoint ),
1869 std::move( lifetime_monitor ) );
1870 }
1871
1872 private:
1874
1876
1878
1880};
1881
1882} /* namespace impl */
1883
1884} /* namespace restinio */
std::unique_ptr< socket_options_setter_t > m_socket_options_setter
connection_factory_t(connection_settings_handle_t< Traits > connection_settings, std::unique_ptr< socket_options_setter_t > socket_options_setter)
typename Traits::logger_t logger_t
connection_settings_handle_t< Traits > m_connection_settings
auto create_new_connection(stream_socket_t socket, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
typename Traits::stream_socket_t stream_socket_t
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
Schedules real timedout operations check on the executer of a connection.
connection_t & operator=(const connection_t &)=delete
typename Traits::strand_t strand_t
void handle_upgrade_request()
Calls handler for upgrade request.
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
write_group_output_ctx_t::file_write_operation_t file_write_operation_t
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
A prepared weak handle for passing it to timer guard.
void on_request_message_complete()
Handle a given request message.
virtual void write_response_parts(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg) override
Write parts for specified request.
write_group_output_ctx_t::none_write_operation_t none_write_operation_t
executor_wrapper_t< typename Traits::strand_t > executor_wrapper_base_t
void handle_xxx_timeout(const char *operation_name)
void guard_write_operation()
Start guard write operation if necessary.
void finish_handling_current_write_ctx()
Do post write actions for current write group.
const endpoint_t m_remote_endpoint
Remote endpoint for this connection.
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
stream_socket_t m_socket
Connection.
connection_input_t m_input
Input routine.
void schedule_operation_timeout_callback(std::chrono::steady_clock::time_point timeout_after, timout_cb_t timout_cb)
Helper function to work with timer guard.
void guard_sendfile_operation(std::chrono::steady_clock::duration timelimit)
request_handler_t & m_request_handler
Request handler.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
Run trivial buffers write operation.
void handle_file_write_operation(file_write_operation_t &op)
Run sendfile write operation.
write_group_output_ctx_t::trivial_write_operation_t trivial_write_operation_t
timout_cb_t m_current_timeout_cb
Callback to all if timeout happened.
void wait_for_http_message()
Start reading next htttp-message.
void after_read(const asio_ns::error_code &ec, std::size_t length) noexcept
Handle read operation result.
logger_t & m_logger
Logger for operation.
std::chrono::steady_clock::time_point m_current_timeout_after
Timeout point of a current guarded operation.
void init_write()
Initiate write operation.
typename Traits::timer_manager_t timer_manager_t
upgrade_internals_t move_upgrade_internals()
Move socket out of connection.
void cancel_timeout_checking() noexcept
Stop timout guarding.
static connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timer to controll operations.
connection_t(const connection_t &)=delete
void handle_current_write_ctx() noexcept
Start/continue/continue handling output data of current write group.
void(connection_t::*)(void) timout_cb_t
Callback type for timedout operations.
connection_t(connection_id_t conn_id, stream_socket_t &&socket, connection_settings_handle_t< Traits > settings, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
void close() noexcept
Close connection functions.
connection_t & operator=(connection_t &&)=delete
typename Traits::logger_t logger_t
void after_write(const asio_ns::error_code &ec) noexcept
Handle write response finished.
void trigger_error_and_close(Message_Builder msg_builder) noexcept
Trigger an error.
response_coordinator_t m_response_coordinator
Response coordinator.
void guard_request_handling_operation()
Start guard request handling operation if necessary.
typename timer_manager_t::timer_guard_t timer_guard_t
request_handler_type_from_traits_t< Traits > request_handler_t
void init_next_timeout_checking()
Schedule next timeout checking.
void consume_data(const char *data, std::size_t length)
Parse some data.
void consume_message()
Start (continue) a chain of read-parse-read-... operations.
timer_guard_t m_timer_guard
Timer guard.
typename Traits::stream_socket_t stream_socket_t
void check_timeout_impl()
Check timed out operation.
connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void guard_read_operation()
Statr guard read operation if necessary.
void write_response_parts_impl(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg)
Write parts for specified request.
connection_t(connection_t &&)=delete
void schedule_operation_timeout_callback(std::chrono::steady_clock::duration timeout, timout_cb_t timout_cb)
write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
Wrapper for an executor (strand) used by connections.
Helper class for reading bytes and feeding them to parser.
Coordinator for process of sending responses with respect to http pipeline technique and chunk transf...
Socket adapter for asio::ssl::stream< asio::ip::tcp::socket >.
auto timelimit() const noexcept
Get the timelimit on this sendfile operation.
auto size() const noexcept
Get the size of sendfile operation.
auto size() const noexcept
The size of data within this operation.
Helper class for writting response data.
A type of holder of limits related to an incoming HTTP message.
connection_id_t connection_id() const noexcept
Get connection id.
Group of writable items transported to the context of underlying connection as one solid piece.
Definition buffers.hpp:727
void invoke_after_write_notificator_if_exists(const asio_ns::error_code &ec)
Get after write notificator.
Definition buffers.hpp:850
auto items_count() const noexcept
Get the count of stored items.
Definition buffers.hpp:867
#define RESTINIO_ENSURE_NOEXCEPT_CALL(expr)
A wrapper around static_assert for checking that an expression is noexcept and execution of that expr...
#define RESTINIO_FMT_FORMAT_STRING(s)
llhttp_settings_t create_parser_settings() noexcept
Helper for setting parser settings.
tls_socket_t * make_tls_socket_pointer_for_state_listener(asio_ns::ip::tcp::socket &) noexcept
void prepare_connection_and_start_read(asio_ns::ip::tcp::socket &, Connection &, Start_Read_CB start_read_cb, Failed_CB)
connection_upgrade_stage_t
Enum for a flag specifying that connection is going to upgrade or not.
@ wait_for_upgrade_handling_result_or_nothing
Handler for request with connection-upgrade header was called so any response data comming is for tha...
@ none
No connection request in progress.
@ pending_upgrade_handling
Request with connection-upgrade header came and waits for request handler to be called in non pipelin...
asio_ns::ip::tcp::endpoint endpoint_t
An alias for endpoint type from Asio.
unsigned int request_id_t
Request id in scope of single connection.
http_field_t http_field
Helper alies to omitt _t suffix.
writable_item_type_t
Buffers write operation type.
Definition buffers.hpp:443
@ trivial_write_operation
Item is a buffer and must be written trivially.
Definition buffers.hpp:445
bool error_is_operation_aborted(const asio_ns::error_code &ec) noexcept
std::weak_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_weak_handle_t
Alias for http connection weak handle.
request_handling_status_t
Request handling status.
@ accepted
Request accepted for handling.
@ not_handled
The request wasn't handled. If there is another handler to be tried it should be tried....
@ rejected
Request wasn't accepted for handling.
asio_convertible_error_t
Enum for restinio errors that must presented as asio_ns::error_code value.
@ write_was_not_executed
After write notificator error: data was not sent, connection closed (or aborted) before a given piece...
std::uint64_t connection_id_t
Type for ID of connection.
asio_ns::error_code make_asio_compaible_error(asio_convertible_error_t err) noexcept
Make restinio error_code compatible with asio_ns::error_code.
std::shared_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_handle_t
Alias for http connection handle.
A kind of metafunction that deduces actual types related to connection count limiter in the dependecy...
Bunch of data related to chunked input.
Data associated with connection read routine.
connection_input_t(std::size_t buffer_size, incoming_http_msg_limits_t limits, const llhttp_settings_t *settings)
fixed_buffer_t m_buf
Input buffer.
bool m_read_operation_is_running
Flag to track whether read operation is performed now.
void reset_parser()
Prepare parser for reading new http-message.
connection_upgrade_stage_t m_connection_upgrade_stage
Connection upgrade request stage.
Internals that are necessary for upgrade.
connection_settings_handle_t< Traits > m_settings
upgrade_internals_t(connection_settings_handle_t< Traits > settings, stream_socket_t socket, lifetime_monitor_t lifetime_monitor)
upgrade_internals_t(upgrade_internals_t &&)=default
Parsing result context for using in parser callbacks.
void reset()
Prepare context to handle new request.
chunked_input_info_unique_ptr_t make_chunked_input_info_if_necessary()
Creates an instance of chunked_input_info if there is an info about chunks in the body.
chunk_ext_params_unique_ptr_t m_chunk_ext_params
Chunk extnsion's params if any.
http_request_header_t m_header
Request data.
std::size_t m_total_field_count
Total number of parsed HTTP-fields.
bool m_message_complete
Flag: is http message parsed completely.
const incoming_http_msg_limits_t m_limits
Limits for the incoming message.
chunked_input_info_block_t m_chunked_info_block
std::string m_current_field_name
Parser context temp values and flags.
http_parser_ctx_t(incoming_http_msg_limits_t limits)
The main constructor.
std::size_t m_bytes_parsed
How many bytes were parsed for current request.
Response output flags for buffers commited to response-coordinator.