Line data Source code
1 : //
2 : // Copyright (c) 2019 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2024 Christian Mazakas
4 : // Copyright (c) 2024 Mohammad Nejati
5 : //
6 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
7 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 : //
9 : // Official repository: https://github.com/cppalliance/http_proto
10 : //
11 :
12 : #include <boost/http_proto/detail/except.hpp>
13 : #include <boost/http_proto/detail/header.hpp>
14 : #include <boost/http_proto/message_base.hpp>
15 : #include <boost/http_proto/serializer.hpp>
16 :
17 : #include "src/detail/array_of_const_buffers.hpp"
18 : #include "src/detail/brotli_filter_base.hpp"
19 : #include "src/detail/buffer_utils.hpp"
20 : #include "src/detail/zlib_filter_base.hpp"
21 :
22 : #include <boost/buffers/circular_buffer.hpp>
23 : #include <boost/buffers/copy.hpp>
24 : #include <boost/core/bit.hpp>
25 : #include <boost/core/ignore_unused.hpp>
26 : #include <boost/capy/brotli/encode.hpp>
27 : #include <boost/capy/polystore.hpp>
28 : #include <boost/capy/zlib/compression_method.hpp>
29 : #include <boost/capy/zlib/compression_strategy.hpp>
30 : #include <boost/capy/zlib/deflate.hpp>
31 : #include <boost/capy/zlib/error.hpp>
32 : #include <boost/capy/zlib/flush.hpp>
33 :
34 : #include <stddef.h>
35 :
36 : namespace boost {
37 : namespace http_proto {
38 :
39 : namespace {
40 :
41 : const
42 : buffers::const_buffer
43 : crlf_and_final_chunk = {"\r\n0\r\n\r\n", 7};
44 :
45 : const
46 : buffers::const_buffer
47 : crlf = {"\r\n", 2};
48 :
49 : const
50 : buffers::const_buffer
51 : final_chunk = {"0\r\n\r\n", 5};
52 :
53 : constexpr
54 : std::uint8_t
55 76 : chunk_header_len(
56 : std::size_t max_chunk_size) noexcept
57 : {
58 : return
59 : static_cast<uint8_t>(
60 76 : (core::bit_width(max_chunk_size) + 3) / 4 +
61 76 : 2); // crlf
62 : };
63 :
64 : void
65 1145 : write_chunk_header(
66 : const buffers::mutable_buffer_pair& mbs,
67 : std::size_t size) noexcept
68 : {
69 : static constexpr char hexdig[] =
70 : "0123456789ABCDEF";
71 : char buf[18];
72 1145 : auto p = buf + 16;
73 1145 : auto const n = buffers::size(mbs);
74 5724 : for(std::size_t i = n - 2; i--;)
75 : {
76 4579 : *--p = hexdig[size & 0xf];
77 4579 : size >>= 4;
78 : }
79 1145 : buf[16] = '\r';
80 1145 : buf[17] = '\n';
81 1145 : auto copied = buffers::copy(
82 : mbs,
83 2290 : buffers::const_buffer(p, n));
84 : ignore_unused(copied);
85 1145 : BOOST_ASSERT(copied == n);
86 1145 : }
87 :
88 : //------------------------------------------------
89 :
90 : class zlib_filter
91 : : public detail::zlib_filter_base
92 : {
93 : capy::zlib::deflate_service& svc_;
94 :
95 : public:
96 52 : zlib_filter(
97 : const capy::polystore& ctx,
98 : http_proto::detail::workspace& ws,
99 : int comp_level,
100 : int window_bits,
101 : int mem_level)
102 52 : : zlib_filter_base(ws)
103 52 : , svc_(ctx.get<capy::zlib::deflate_service>())
104 : {
105 104 : system::error_code ec = static_cast<capy::zlib::error>(svc_.init2(
106 52 : strm_,
107 : comp_level,
108 : capy::zlib::deflated,
109 : window_bits,
110 : mem_level,
111 52 : capy::zlib::default_strategy));
112 52 : if(ec != capy::zlib::error::ok)
113 0 : detail::throw_system_error(ec);
114 52 : }
115 :
116 : private:
117 : virtual
118 : std::size_t
119 86 : min_out_buffer() const noexcept override
120 : {
121 : // Prevents deflate from producing
122 : // zero output due to small buffer
123 86 : return 8;
124 : }
125 :
126 : virtual
127 : results
128 4929 : do_process(
129 : buffers::mutable_buffer out,
130 : buffers::const_buffer in,
131 : bool more) noexcept override
132 : {
133 4929 : strm_.next_out = static_cast<unsigned char*>(out.data());
134 4929 : strm_.avail_out = saturate_cast(out.size());
135 4929 : strm_.next_in = static_cast<unsigned char*>(const_cast<void *>(in.data()));
136 4929 : strm_.avail_in = saturate_cast(in.size());
137 :
138 : auto rs = static_cast<capy::zlib::error>(
139 4929 : svc_.deflate(
140 4929 : strm_,
141 : more ? capy::zlib::no_flush : capy::zlib::finish));
142 :
143 4929 : results rv;
144 4929 : rv.out_bytes = saturate_cast(out.size()) - strm_.avail_out;
145 4929 : rv.in_bytes = saturate_cast(in.size()) - strm_.avail_in;
146 4929 : rv.finished = (rs == capy::zlib::error::stream_end);
147 :
148 4929 : if(rs < capy::zlib::error::ok && rs != capy::zlib::error::buf_err)
149 0 : rv.ec = rs;
150 :
151 4929 : return rv;
152 : }
153 : };
154 :
155 : class brotli_filter
156 : : public detail::brotli_filter_base
157 : {
158 : capy::brotli::encode_service& svc_;
159 : capy::brotli::encoder_state* state_;
160 :
161 : public:
162 0 : brotli_filter(
163 : const capy::polystore& ctx,
164 : http_proto::detail::workspace&,
165 : std::uint32_t comp_quality,
166 : std::uint32_t comp_window)
167 0 : : svc_(ctx.get<capy::brotli::encode_service>())
168 : {
169 : // TODO: use custom allocator
170 0 : state_ = svc_.create_instance(nullptr, nullptr, nullptr);
171 0 : if(!state_)
172 0 : detail::throw_bad_alloc();
173 : using encoder_parameter = capy::brotli::encoder_parameter;
174 0 : svc_.set_parameter(state_, encoder_parameter::quality, comp_quality);
175 0 : svc_.set_parameter(state_, encoder_parameter::lgwin, comp_window);
176 0 : }
177 :
178 0 : ~brotli_filter()
179 0 : {
180 0 : svc_.destroy_instance(state_);
181 0 : }
182 :
183 : private:
184 : virtual
185 : results
186 0 : do_process(
187 : buffers::mutable_buffer out,
188 : buffers::const_buffer in,
189 : bool more) noexcept override
190 : {
191 0 : auto* next_in = reinterpret_cast<const std::uint8_t*>(in.data());
192 0 : auto available_in = in.size();
193 0 : auto* next_out = reinterpret_cast<std::uint8_t*>(out.data());
194 0 : auto available_out = out.size();
195 :
196 : using encoder_operation =
197 : capy::brotli::encoder_operation;
198 :
199 0 : bool rs = svc_.compress_stream(
200 : state_,
201 : more ? encoder_operation::process : encoder_operation::finish,
202 : &available_in,
203 : &next_in,
204 : &available_out,
205 : &next_out,
206 : nullptr);
207 :
208 0 : results rv;
209 0 : rv.in_bytes = in.size() - available_in;
210 0 : rv.out_bytes = out.size() - available_out;
211 0 : rv.finished = svc_.is_finished(state_);
212 :
213 : // TODO: use proper error code
214 0 : if(rs == false)
215 0 : rv.ec = error::bad_payload;
216 :
217 0 : return rv;
218 : }
219 : };
220 :
221 : template<class UInt>
222 : std::size_t
223 8 : clamp(
224 : UInt x,
225 : std::size_t limit = (std::numeric_limits<
226 : std::size_t>::max)()) noexcept
227 : {
228 8 : if(x >= limit)
229 2 : return limit;
230 6 : return static_cast<std::size_t>(x);
231 : }
232 :
233 : class serializer_service
234 : {
235 : public:
236 : serializer::config cfg;
237 : std::size_t space_needed = 0;
238 :
239 24 : serializer_service(
240 : serializer::config const& cfg_)
241 24 : : cfg(cfg_)
242 : {
243 24 : space_needed += cfg.payload_buffer;
244 24 : space_needed += cfg.max_type_erase;
245 :
246 24 : if(cfg.apply_deflate_encoder || cfg.apply_gzip_encoder)
247 : {
248 : // TODO: Account for the number of allocations and
249 : // their overhead in the workspace.
250 :
251 : // https://www.zlib.net/zlib_tech.html
252 1 : space_needed +=
253 1 : (1 << (cfg.zlib_window_bits + 2)) +
254 1 : (1 << (cfg.zlib_mem_level + 9)) +
255 1 : (6 * 1024) +
256 : #ifdef __s390x__
257 : 5768 +
258 : #endif
259 1 : detail::workspace::space_needed<zlib_filter>();
260 : }
261 24 : }
262 : };
263 :
264 : } // namespace
265 :
266 : //------------------------------------------------
267 :
268 : void
269 24 : install_serializer_service(
270 : capy::polystore& ctx,
271 : serializer::config const& cfg)
272 : {
273 24 : ctx.emplace<serializer_service>(cfg);
274 24 : }
275 :
276 : //------------------------------------------------
277 :
278 : class serializer::impl
279 : {
280 : friend stream;
281 :
282 : enum class state
283 : {
284 : reset,
285 : initialized,
286 : headers_set,
287 : header,
288 : body
289 : };
290 :
291 : enum class style
292 : {
293 : empty,
294 : buffers,
295 : source,
296 : stream
297 : };
298 :
299 : const capy::polystore& ctx_;
300 : serializer_service& svc_;
301 : detail::workspace ws_;
302 :
303 : detail::filter* filter_ = nullptr;
304 : cbs_gen* cbs_gen_ = nullptr;
305 : source* source_ = nullptr;
306 :
307 : buffers::circular_buffer out_;
308 : buffers::circular_buffer in_;
309 : detail::array_of_const_buffers prepped_;
310 : buffers::const_buffer tmp_;
311 :
312 : state state_ = state::initialized;
313 : style style_ = style::empty;
314 : uint8_t chunk_header_len_ = 0;
315 : bool more_input_ = false;
316 : bool is_chunked_ = false;
317 : bool needs_exp100_continue_ = false;
318 : bool filter_done_ = false;
319 :
320 : // Stored header buffer from set_headers
321 : char const* header_cbuf_ = nullptr;
322 : std::size_t header_size_ = 0;
323 :
324 : public:
325 25 : impl(const capy::polystore& ctx)
326 25 : : ctx_(ctx)
327 25 : , svc_(ctx_.get<serializer_service>())
328 25 : , ws_(svc_.space_needed)
329 : {
330 25 : }
331 :
332 : void
333 82 : reset() noexcept
334 : {
335 82 : ws_.clear();
336 82 : state_ = state::initialized;
337 82 : }
338 :
339 : auto
340 2434 : prepare() ->
341 : system::result<const_buffers_type>
342 : {
343 : // Precondition violation
344 2434 : if(state_ < state::header)
345 2 : detail::throw_logic_error();
346 :
347 : // Expect: 100-continue
348 2432 : if(needs_exp100_continue_)
349 : {
350 4 : if(!is_header_done())
351 4 : return const_buffers_type(
352 : prepped_.begin(),
353 2 : 1); // limit to header
354 :
355 2 : needs_exp100_continue_ = false;
356 :
357 2 : BOOST_HTTP_PROTO_RETURN_EC(
358 : error::expect_100_continue);
359 : }
360 :
361 2428 : if(!filter_)
362 : {
363 62 : switch(style_)
364 : {
365 6 : case style::empty:
366 6 : break;
367 :
368 20 : case style::buffers:
369 : {
370 : // add more buffers if prepped_ is half empty.
371 30 : if(more_input_ &&
372 10 : prepped_.capacity() >= prepped_.size())
373 : {
374 4 : prepped_.slide_to_front();
375 50 : while(prepped_.capacity() != 0)
376 : {
377 48 : auto buf = cbs_gen_->next();
378 48 : if(buf.size() == 0)
379 2 : break;
380 46 : prepped_.append(buf);
381 : }
382 4 : if(cbs_gen_->is_empty())
383 : {
384 2 : if(is_chunked_)
385 : {
386 1 : if(prepped_.capacity() != 0)
387 : {
388 1 : prepped_.append(
389 : crlf_and_final_chunk);
390 1 : more_input_ = false;
391 : }
392 : }
393 : else
394 : {
395 1 : more_input_ = false;
396 : }
397 : }
398 : }
399 20 : return detail::make_span(prepped_);
400 : }
401 :
402 23 : case style::source:
403 : {
404 23 : if(out_capacity() == 0 || !more_input_)
405 22 : break;
406 :
407 7 : const auto rs = source_->read(
408 7 : out_prepare());
409 :
410 7 : out_commit(rs.bytes);
411 :
412 7 : if(rs.ec.failed())
413 : {
414 1 : ws_.clear();
415 1 : state_ = state::reset;
416 1 : return rs.ec;
417 : }
418 :
419 6 : if(rs.finished)
420 : {
421 6 : more_input_ = false;
422 6 : out_finish();
423 : }
424 :
425 6 : break;
426 : }
427 :
428 13 : case style::stream:
429 13 : if(out_.size() == 0 && is_header_done() && more_input_)
430 3 : BOOST_HTTP_PROTO_RETURN_EC(
431 : error::need_data);
432 10 : break;
433 : }
434 : }
435 : else // filter
436 : {
437 2366 : switch(style_)
438 : {
439 4 : case style::empty:
440 : {
441 4 : if(out_capacity() == 0 || filter_done_)
442 4 : break;
443 :
444 4 : const auto rs = filter_->process(
445 4 : detail::make_span(out_prepare()),
446 : {}, // empty input
447 : false);
448 :
449 4 : if(rs.ec.failed())
450 : {
451 0 : ws_.clear();
452 0 : state_ = state::reset;
453 0 : return rs.ec;
454 : }
455 :
456 4 : out_commit(rs.out_bytes);
457 :
458 4 : if(rs.finished)
459 : {
460 4 : filter_done_ = true;
461 4 : out_finish();
462 : }
463 :
464 4 : break;
465 : }
466 :
467 376 : case style::buffers:
468 : {
469 2572 : while(out_capacity() != 0 && !filter_done_)
470 : {
471 2196 : if(more_input_ && tmp_.size() == 0)
472 : {
473 2008 : tmp_ = cbs_gen_->next();
474 2008 : if(tmp_.size() == 0) // cbs_gen_ is empty
475 16 : more_input_ = false;
476 : }
477 :
478 2196 : const auto rs = filter_->process(
479 2196 : detail::make_span(out_prepare()),
480 : {{ {tmp_}, {} }},
481 2196 : more_input_);
482 :
483 2196 : if(rs.ec.failed())
484 : {
485 0 : ws_.clear();
486 0 : state_ = state::reset;
487 0 : return rs.ec;
488 : }
489 :
490 2196 : buffers::remove_prefix(tmp_, rs.in_bytes);
491 2196 : out_commit(rs.out_bytes);
492 :
493 2196 : if(rs.out_short)
494 0 : break;
495 :
496 2196 : if(rs.finished)
497 : {
498 16 : filter_done_ = true;
499 16 : out_finish();
500 : }
501 : }
502 376 : break;
503 : }
504 :
505 734 : case style::source:
506 : {
507 2179 : while(out_capacity() != 0 && !filter_done_)
508 : {
509 1445 : if(more_input_ && in_.capacity() != 0)
510 : {
511 984 : const auto rs = source_->read(
512 984 : in_.prepare(in_.capacity()));
513 984 : if(rs.ec.failed())
514 : {
515 0 : ws_.clear();
516 0 : state_ = state::reset;
517 0 : return rs.ec;
518 : }
519 984 : if(rs.finished)
520 16 : more_input_ = false;
521 984 : in_.commit(rs.bytes);
522 : }
523 :
524 1445 : const auto rs = filter_->process(
525 1445 : detail::make_span(out_prepare()),
526 : in_.data(),
527 1445 : more_input_);
528 :
529 1445 : if(rs.ec.failed())
530 : {
531 0 : ws_.clear();
532 0 : state_ = state::reset;
533 0 : return rs.ec;
534 : }
535 :
536 1445 : in_.consume(rs.in_bytes);
537 1445 : out_commit(rs.out_bytes);
538 :
539 1445 : if(rs.out_short)
540 0 : break;
541 :
542 1445 : if(rs.finished)
543 : {
544 16 : filter_done_ = true;
545 16 : out_finish();
546 : }
547 : }
548 734 : break;
549 : }
550 :
551 1252 : case style::stream:
552 : {
553 1252 : if(out_capacity() == 0 || filter_done_)
554 804 : break;
555 :
556 1252 : const auto rs = filter_->process(
557 1252 : detail::make_span(out_prepare()),
558 : in_.data(),
559 1252 : more_input_);
560 :
561 1252 : if(rs.ec.failed())
562 : {
563 0 : ws_.clear();
564 0 : state_ = state::reset;
565 448 : return rs.ec;
566 : }
567 :
568 1252 : in_.consume(rs.in_bytes);
569 1252 : out_commit(rs.out_bytes);
570 :
571 1252 : if(rs.finished)
572 : {
573 16 : filter_done_ = true;
574 16 : out_finish();
575 : }
576 :
577 1252 : if(out_.size() == 0 && is_header_done() && more_input_)
578 448 : BOOST_HTTP_PROTO_RETURN_EC(
579 : error::need_data);
580 804 : break;
581 : }
582 : }
583 : }
584 :
585 1956 : prepped_.reset(!is_header_done());
586 5868 : for(auto const& cb : out_.data())
587 : {
588 3912 : if(cb.size() != 0)
589 1948 : prepped_.append(cb);
590 : }
591 1956 : return detail::make_span(prepped_);
592 : }
593 :
594 : void
595 3717 : consume(
596 : std::size_t n)
597 : {
598 : // Precondition violation
599 3717 : if(state_ < state::header)
600 1 : detail::throw_logic_error();
601 :
602 3716 : if(!is_header_done())
603 : {
604 : const auto header_remain =
605 85 : prepped_[0].size();
606 85 : if(n < header_remain)
607 : {
608 12 : prepped_.consume(n);
609 12 : return;
610 : }
611 73 : n -= header_remain;
612 73 : prepped_.consume(header_remain);
613 73 : state_ = state::body;
614 : }
615 :
616 3704 : prepped_.consume(n);
617 :
618 : // no-op when out_ is not in use
619 3704 : out_.consume(n);
620 :
621 3704 : if(!prepped_.empty())
622 1759 : return;
623 :
624 1945 : if(more_input_)
625 1837 : return;
626 :
627 108 : if(filter_ && !filter_done_)
628 34 : return;
629 :
630 74 : if(needs_exp100_continue_)
631 1 : return;
632 :
633 : // ready for next message
634 73 : reset();
635 : }
636 :
637 : void
638 84 : set_headers(
639 : message_base const& m)
640 : {
641 : // Precondition violation
642 84 : if(state_ != state::initialized)
643 1 : detail::throw_logic_error();
644 :
645 : // TODO: To uphold the strong exception guarantee,
646 : // `state_` must be reset to `state::initialized` if an
647 : // exception is thrown during the start operation.
648 83 : state_ = state::headers_set;
649 :
650 : // VFALCO what do we do with
651 : // metadata error code failures?
652 : // m.h_.md.maybe_throw();
653 :
654 83 : auto const& md = m.metadata();
655 83 : needs_exp100_continue_ = md.expect.is_100_continue;
656 :
657 : // Transfer-Encoding
658 83 : is_chunked_ = md.transfer_encoding.is_chunked;
659 :
660 : // Content-Encoding
661 83 : switch (md.content_encoding.coding)
662 : {
663 26 : case content_coding::deflate:
664 26 : if(!svc_.cfg.apply_deflate_encoder)
665 0 : goto no_filter;
666 52 : filter_ = &ws_.emplace<zlib_filter>(
667 : ctx_,
668 26 : ws_,
669 26 : svc_.cfg.zlib_comp_level,
670 26 : svc_.cfg.zlib_window_bits,
671 26 : svc_.cfg.zlib_mem_level);
672 26 : filter_done_ = false;
673 26 : break;
674 :
675 26 : case content_coding::gzip:
676 26 : if(!svc_.cfg.apply_gzip_encoder)
677 0 : goto no_filter;
678 52 : filter_ = &ws_.emplace<zlib_filter>(
679 : ctx_,
680 26 : ws_,
681 26 : svc_.cfg.zlib_comp_level,
682 52 : svc_.cfg.zlib_window_bits + 16,
683 26 : svc_.cfg.zlib_mem_level);
684 26 : filter_done_ = false;
685 26 : break;
686 :
687 0 : case content_coding::br:
688 0 : if(!svc_.cfg.apply_brotli_encoder)
689 0 : goto no_filter;
690 0 : filter_ = &ws_.emplace<brotli_filter>(
691 : ctx_,
692 0 : ws_,
693 0 : svc_.cfg.brotli_comp_quality,
694 0 : svc_.cfg.brotli_comp_window);
695 0 : filter_done_ = false;
696 0 : break;
697 :
698 0 : no_filter:
699 31 : default:
700 31 : filter_ = nullptr;
701 31 : break;
702 : }
703 :
704 : // Store header buffer for later use
705 83 : header_cbuf_ = m.h_.cbuf;
706 83 : header_size_ = m.h_.size;
707 83 : }
708 :
709 : void
710 12 : start_empty(
711 : message_base const& m)
712 : {
713 12 : set_headers(m);
714 11 : start_empty_impl();
715 11 : }
716 :
717 : void
718 : start_buffers(
719 : message_base const& m,
720 : cbs_gen& cbs_gen)
721 : {
722 : set_headers(m);
723 : start_buffers_impl(cbs_gen);
724 : }
725 :
726 : void
727 : start_source(
728 : message_base const& m,
729 : source& source)
730 : {
731 : set_headers(m);
732 : start_source_impl(source);
733 : }
734 :
735 : stream
736 23 : start_stream(message_base const& m)
737 : {
738 23 : set_headers(m);
739 23 : return start_stream_impl();
740 : }
741 :
742 : // New methods that use stored header from set_headers()
743 :
744 : void
745 11 : start_empty_impl()
746 : {
747 : // Precondition violation
748 11 : if(state_ != state::headers_set)
749 0 : detail::throw_logic_error();
750 :
751 11 : state_ = state::header;
752 11 : style_ = style::empty;
753 :
754 11 : prepped_ = make_array(
755 : 1 + // header
756 : 2); // out buffer pairs
757 :
758 11 : out_init();
759 :
760 11 : if(!filter_)
761 7 : out_finish();
762 :
763 11 : prepped_.append({ header_cbuf_, header_size_ });
764 11 : more_input_ = false;
765 11 : }
766 :
767 : void
768 24 : start_buffers_impl(
769 : cbs_gen& cbs_gen)
770 : {
771 : // Precondition violation
772 24 : if(state_ != state::headers_set)
773 0 : detail::throw_logic_error();
774 :
775 24 : state_ = state::header;
776 24 : style_ = style::buffers;
777 24 : cbs_gen_ = &cbs_gen;
778 :
779 24 : if(!filter_)
780 : {
781 8 : auto stats = cbs_gen_->stats();
782 8 : auto batch_size = clamp(stats.count, 16);
783 :
784 0 : prepped_ = make_array(
785 : 1 + // header
786 8 : batch_size + // buffers
787 8 : (is_chunked_ ? 2 : 0)); // chunk header + final chunk
788 :
789 8 : prepped_.append({ header_cbuf_, header_size_ });
790 8 : more_input_ = (batch_size != 0);
791 :
792 8 : if(is_chunked_)
793 : {
794 2 : if(!more_input_)
795 : {
796 1 : prepped_.append(final_chunk);
797 : }
798 : else
799 : {
800 1 : auto h_len = chunk_header_len(stats.size);
801 : buffers::mutable_buffer mb(
802 1 : ws_.reserve_front(h_len), h_len);
803 1 : write_chunk_header({{ {mb}, {} }}, stats.size);
804 1 : prepped_.append(mb);
805 : }
806 : }
807 8 : return;
808 : }
809 :
810 : // filter
811 :
812 16 : prepped_ = make_array(
813 : 1 + // header
814 : 2); // out buffer pairs
815 :
816 16 : out_init();
817 :
818 16 : prepped_.append({ header_cbuf_, header_size_ });
819 16 : tmp_ = {};
820 16 : more_input_ = true;
821 : }
822 :
823 : void
824 25 : start_source_impl(
825 : source& source)
826 : {
827 : // Precondition violation
828 25 : if(state_ != state::headers_set)
829 0 : detail::throw_logic_error();
830 :
831 25 : state_ = state::header;
832 25 : style_ = style::source;
833 25 : source_ = &source;
834 :
835 25 : prepped_ = make_array(
836 : 1 + // header
837 : 2); // out buffer pairs
838 :
839 25 : if(filter_)
840 : {
841 : // TODO: smarter buffer distribution
842 16 : auto const n = (ws_.size() - 1) / 2;
843 16 : in_ = { ws_.reserve_front(n), n };
844 : }
845 :
846 25 : out_init();
847 :
848 25 : prepped_.append({ header_cbuf_, header_size_ });
849 25 : more_input_ = true;
850 25 : }
851 :
852 : stream
853 23 : start_stream_impl()
854 : {
855 : // Precondition violation
856 23 : if(state_ != state::headers_set)
857 0 : detail::throw_logic_error();
858 :
859 23 : state_ = state::header;
860 23 : style_ = style::stream;
861 :
862 23 : prepped_ = make_array(
863 : 1 + // header
864 : 2); // out buffer pairs
865 :
866 23 : if(filter_)
867 : {
868 : // TODO: smarter buffer distribution
869 16 : auto const n = (ws_.size() - 1) / 2;
870 16 : in_ = { ws_.reserve_front(n), n };
871 : }
872 :
873 23 : out_init();
874 :
875 23 : prepped_.append({ header_cbuf_, header_size_ });
876 23 : more_input_ = true;
877 23 : return stream{ this };
878 : }
879 :
880 : bool
881 2487 : is_done() const noexcept
882 : {
883 2487 : return state_ == state::initialized;
884 : }
885 :
886 : bool
887 0 : is_headers_set() const noexcept
888 : {
889 0 : return state_ == state::headers_set;
890 : }
891 :
892 : detail::workspace&
893 49 : ws() noexcept
894 : {
895 49 : return ws_;
896 : }
897 :
898 : private:
899 : bool
900 6131 : is_header_done() const noexcept
901 : {
902 6131 : return state_ == state::body;
903 : }
904 :
905 : detail::array_of_const_buffers
906 83 : make_array(std::size_t n)
907 : {
908 83 : BOOST_ASSERT(n <= std::uint16_t(-1));
909 :
910 : return {
911 83 : ws_.push_array(n,
912 0 : buffers::const_buffer{}),
913 83 : static_cast<std::uint16_t>(n) };
914 : }
915 :
916 : void
917 75 : out_init()
918 : {
919 : // use all the remaining buffer
920 75 : auto const n = ws_.size() - 1;
921 75 : out_ = { ws_.reserve_front(n), n };
922 75 : chunk_header_len_ =
923 75 : chunk_header_len(out_.capacity());
924 75 : if(out_capacity() == 0)
925 0 : detail::throw_length_error();
926 75 : }
927 :
928 : buffers::mutable_buffer_pair
929 4910 : out_prepare() noexcept
930 : {
931 4910 : auto mbp = out_.prepare(out_.capacity());
932 4910 : if(is_chunked_)
933 : {
934 2454 : buffers::remove_prefix(
935 2454 : mbp, chunk_header_len_);
936 2454 : buffers::remove_suffix(
937 : mbp, crlf_and_final_chunk.size());
938 : }
939 4910 : return mbp;
940 : }
941 :
942 : void
943 4910 : out_commit(
944 : std::size_t n) noexcept
945 : {
946 4910 : if(is_chunked_)
947 : {
948 2454 : if(n == 0)
949 1310 : return;
950 :
951 1144 : write_chunk_header(out_.prepare(chunk_header_len_), n);
952 1144 : out_.commit(chunk_header_len_);
953 :
954 1144 : out_.prepare(n);
955 1144 : out_.commit(n);
956 :
957 1144 : buffers::copy(out_.prepare(crlf.size()), crlf);
958 1144 : out_.commit(crlf.size());
959 : }
960 : else
961 : {
962 2456 : out_.commit(n);
963 : }
964 : }
965 :
966 : std::size_t
967 6129 : out_capacity() const noexcept
968 : {
969 6129 : if(is_chunked_)
970 : {
971 3061 : auto const overhead = chunk_header_len_ +
972 3061 : crlf_and_final_chunk.size();
973 3061 : if(out_.capacity() < overhead)
974 541 : return 0;
975 2520 : return out_.capacity() - overhead;
976 : }
977 3068 : return out_.capacity();
978 : }
979 :
980 : void
981 72 : out_finish() noexcept
982 : {
983 72 : if(is_chunked_)
984 : {
985 33 : buffers::copy(
986 33 : out_.prepare(final_chunk.size()), final_chunk);
987 33 : out_.commit(final_chunk.size());
988 : }
989 72 : }
990 : };
991 :
992 : //------------------------------------------------
993 :
994 31 : serializer::
995 : ~serializer()
996 : {
997 31 : delete impl_;
998 31 : }
999 :
1000 1 : serializer::
1001 1 : serializer(serializer&& other) noexcept
1002 1 : : impl_(other.impl_)
1003 : {
1004 1 : other.impl_ = nullptr;
1005 1 : }
1006 :
1007 : serializer&
1008 2 : serializer::
1009 : operator=(serializer&& other) noexcept
1010 : {
1011 2 : if(this != &other)
1012 : {
1013 2 : delete impl_;
1014 2 : impl_ = other.impl_;
1015 2 : other.impl_ = nullptr;
1016 : }
1017 2 : return *this;
1018 : }
1019 :
1020 25 : serializer::
1021 25 : serializer(capy::polystore& ctx)
1022 25 : : impl_(new impl(ctx))
1023 : {
1024 : // TODO: use a single allocation for
1025 : // impl and workspace buffer.
1026 25 : }
1027 :
1028 : void
1029 9 : serializer::
1030 : reset() noexcept
1031 : {
1032 9 : BOOST_ASSERT(impl_);
1033 9 : impl_->reset();
1034 9 : }
1035 :
1036 : void
1037 12 : serializer::
1038 : start(message_base const& m)
1039 : {
1040 12 : BOOST_ASSERT(impl_);
1041 12 : impl_->start_empty(m);
1042 11 : }
1043 :
1044 : void
1045 49 : serializer::
1046 : set_headers(message_base const& m)
1047 : {
1048 49 : BOOST_ASSERT(impl_);
1049 49 : impl_->set_headers(m);
1050 49 : }
1051 :
1052 : void
1053 0 : serializer::
1054 : start()
1055 : {
1056 0 : BOOST_ASSERT(impl_);
1057 0 : impl_->start_empty_impl();
1058 0 : }
1059 :
1060 : auto
1061 23 : serializer::
1062 : start_stream(
1063 : message_base const& m) -> stream
1064 : {
1065 23 : BOOST_ASSERT(impl_);
1066 23 : return impl_->start_stream(m);
1067 : }
1068 :
1069 : auto
1070 0 : serializer::
1071 : start_stream() -> stream
1072 : {
1073 0 : BOOST_ASSERT(impl_);
1074 0 : return impl_->start_stream_impl();
1075 : }
1076 :
1077 : auto
1078 2434 : serializer::
1079 : prepare() ->
1080 : system::result<const_buffers_type>
1081 : {
1082 2434 : BOOST_ASSERT(impl_);
1083 2434 : return impl_->prepare();
1084 : }
1085 :
1086 : void
1087 3717 : serializer::
1088 : consume(std::size_t n)
1089 : {
1090 3717 : BOOST_ASSERT(impl_);
1091 3717 : impl_->consume(n);
1092 3716 : }
1093 :
1094 : bool
1095 2487 : serializer::
1096 : is_done() const noexcept
1097 : {
1098 2487 : BOOST_ASSERT(impl_);
1099 2487 : return impl_->is_done();
1100 : }
1101 :
1102 : bool
1103 0 : serializer::
1104 : is_headers_set() const noexcept
1105 : {
1106 0 : BOOST_ASSERT(impl_);
1107 0 : return impl_->is_headers_set();
1108 : }
1109 :
1110 : //------------------------------------------------
1111 :
1112 : detail::workspace&
1113 49 : serializer::
1114 : ws()
1115 : {
1116 49 : BOOST_ASSERT(impl_);
1117 49 : return impl_->ws();
1118 : }
1119 :
1120 : void
1121 24 : serializer::
1122 : start_buffers_impl(
1123 : cbs_gen& cbs_gen)
1124 : {
1125 24 : BOOST_ASSERT(impl_);
1126 24 : impl_->start_buffers_impl(cbs_gen);
1127 24 : }
1128 :
1129 : void
1130 25 : serializer::
1131 : start_source_impl(
1132 : source& source)
1133 : {
1134 25 : BOOST_ASSERT(impl_);
1135 25 : impl_->start_source_impl(source);
1136 25 : }
1137 :
1138 : //------------------------------------------------
1139 :
1140 : std::size_t
1141 1265 : serializer::
1142 : stream::
1143 : capacity() const
1144 : {
1145 : // Precondition violation
1146 1265 : if(!is_open())
1147 1 : detail::throw_logic_error();
1148 :
1149 1264 : if(impl_->filter_)
1150 1240 : return impl_->in_.capacity();
1151 :
1152 24 : return impl_->out_capacity();
1153 : }
1154 :
1155 : auto
1156 1247 : serializer::
1157 : stream::
1158 : prepare() ->
1159 : mutable_buffers_type
1160 : {
1161 : // Precondition violation
1162 1247 : if(!is_open())
1163 1 : detail::throw_logic_error();
1164 :
1165 1246 : if(impl_->filter_)
1166 1240 : return impl_->in_.prepare(
1167 2480 : impl_->in_.capacity());
1168 :
1169 6 : return impl_->out_prepare();
1170 : }
1171 :
1172 : void
1173 1248 : serializer::
1174 : stream::
1175 : commit(std::size_t n)
1176 : {
1177 : // Precondition violation
1178 1248 : if(!is_open())
1179 1 : detail::throw_logic_error();
1180 :
1181 : // Precondition violation
1182 1247 : if(n > capacity())
1183 1 : detail::throw_invalid_argument();
1184 :
1185 1246 : if(impl_->filter_)
1186 1240 : return impl_->in_.commit(n);
1187 :
1188 6 : impl_->out_commit(n);
1189 : }
1190 :
1191 : void
1192 47 : serializer::
1193 : stream::
1194 : close() noexcept
1195 : {
1196 47 : if(!is_open())
1197 24 : return; // no-op;
1198 :
1199 23 : if(!impl_->filter_)
1200 7 : impl_->out_finish();
1201 :
1202 23 : impl_->more_input_ = false;
1203 23 : impl_ = nullptr;
1204 : }
1205 :
1206 : } // http_proto
1207 : } // boost
|