client.cpp 80.6 KB
Newer Older
Peter Jankuliak's avatar
Peter Jankuliak committed
1
2
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
3
#include <boost/asio/connect.hpp>
4
#include <boost/asio/signal_set.hpp>
5
6
7
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
8
#include <boost/date_time/posix_time/posix_time.hpp>
9
#include <boost/format.hpp>
10
11
#include <boost/asio/ssl.hpp>
#include <boost/asio/ssl/stream.hpp>
12
#include <boost/optional/optional_io.hpp>
13
14
15
16
#include <boost/range/adaptor/indirected.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/indexed.hpp>
Peter Jankuliak's avatar
Peter Jankuliak committed
17
#include <iostream>
18
#include <cstdlib>  // for atexit()
Peter Jankuliak's avatar
Peter Jankuliak committed
19

20
#include "cache/bep5_http/client.h"
21

22
#include "namespaces.h"
23
#include "origin_pools.h"
24
#include "http_util.h"
25
#include "fetch_http_page.h"
26
#include "client_front_end.h"
27
#include "generic_stream.h"
28
#include "util.h"
29
#include "async_sleep.h"
30
#include "endpoint.h"
31
#include "or_throw.h"
32
#include "request_routing.h"
33
#include "full_duplex_forward.h"
34
#include "client_config.h"
Peter Jankuliak's avatar
Peter Jankuliak committed
35
#include "client.h"
36
#include "authenticate.h"
37
#include "defer.h"
38
#include "default_timeout.h"
39
#include "constants.h"
40
#include "util/async_queue_reader.h"
41
#include "session.h"
42
#include "create_udp_multiplexer.h"
43
#include "ssl/ca_certificate.h"
44
#include "ssl/dummy_certificate.h"
45
#include "ssl/util.h"
46
#include "bittorrent/dht.h"
47
#include "bittorrent/mutable_data.h"
Peter Jankuliak's avatar
Peter Jankuliak committed
48

49
#ifndef __ANDROID__
Peter Jankuliak's avatar
Peter Jankuliak committed
50
#  include "force_exit_on_signal.h"
51
#endif // ifndef __ANDROID__
52

53
54
#include "ouiservice.h"
#include "ouiservice/i2p.h"
55
#include "ouiservice/lampshade.h"
56
57
#include "ouiservice/pt-obfs2.h"
#include "ouiservice/pt-obfs3.h"
58
#include "ouiservice/pt-obfs4.h"
59
#include "ouiservice/tcp.h"
60
#include "ouiservice/utp.h"
61
#include "ouiservice/tls.h"
62
#include "ouiservice/weak_client.h"
63
#include "ouiservice/bep5/client.h"
64
#include "ouiservice/multi_utp_server.h"
65

66
#include "parse/number.h"
67
#include "util/signal.h"
68
#include "util/crypto.h"
69
#include "util/lru_cache.h"
70
#include "util/scheduler.h"
71
#include "util/reachability.h"
72
#include "util/async_job.h"
73
#include "upnp.h"
74
#include "util/handler_tracker.h"
Peter Jankuliak's avatar
Peter Jankuliak committed
75

76
77
#include "logger.h"

78
79
using namespace std;
using namespace ouinet;
Peter Jankuliak's avatar
Peter Jankuliak committed
80

81
namespace posix_time = boost::posix_time;
82
namespace bt = ouinet::bittorrent;
83

84
85
86
using tcp      = asio::ip::tcp;
using Request  = http::request<http::string_body>;
using Response = http::response<http::dynamic_body>;
Peter Jankuliak's avatar
Peter Jankuliak committed
87

88
89
90
static const fs::path OUINET_CA_CERT_FILE = "ssl-ca-cert.pem";
static const fs::path OUINET_CA_KEY_FILE = "ssl-ca-key.pem";
static const fs::path OUINET_CA_DH_FILE = "ssl-ca-dh.pem";
Peter Jankuliak's avatar
Peter Jankuliak committed
91

92
static bool log_transactions() {
93
94
    return logger.get_threshold() <= DEBUG
        || logger.get_log_file() != nullptr;
95
96
}

Peter Jankuliak's avatar
Peter Jankuliak committed
97
98
99
100
101
102
103
104
105
//------------------------------------------------------------------------------
struct UserAgentMetaData {
    boost::optional<bool> is_private;
    boost::optional<std::string> dht_group;

    static UserAgentMetaData extract(Request& rq) {
        UserAgentMetaData ret;

        {
106
            auto i = rq.find(http_::request_group_hdr);
Peter Jankuliak's avatar
Peter Jankuliak committed
107
108
109
110
111
112
            if (i != rq.end()) {
                ret.dht_group = i->value().to_string();
                rq.erase(i);
            }
        }
        {
113
            auto i = rq.find(http_::request_private_hdr);
Peter Jankuliak's avatar
Peter Jankuliak committed
114
            if (i != rq.end()) {
115
                ret.is_private = boost::iequals(i->value(), http_::request_private_true);
Peter Jankuliak's avatar
Peter Jankuliak committed
116
117
118
119
120
121
122
123
                rq.erase(i);
            }
        }

        return ret;
    }
};

124
//------------------------------------------------------------------------------
125
class Client::State : public enable_shared_from_this<Client::State> {
Peter Jankuliak's avatar
Peter Jankuliak committed
126
127
    friend class Client;

Peter Jankuliak's avatar
Peter Jankuliak committed
128
public:
129
130
    State(asio::io_context& ctx, ClientConfig cfg)
        : _ctx(ctx)
131
        , _config(move(cfg))
132
133
134
135
        // A certificate chain with OUINET_CA + SUBJECT_CERT
        // can be around 2 KiB, so this would be around 2 MiB.
        // TODO: Fine tune if necessary.
        , _ssl_certificate_cache(1000)
136
        , ssl_ctx{asio::ssl::context::tls_client}
137
        , inj_ctx{asio::ssl::context::tls_client}
138
139
140
    {
        ssl_ctx.set_default_verify_paths();
        ssl_ctx.set_verify_mode(asio::ssl::verify_peer);
141

142
143
144
145
146
        // We do *not* want to do this since
        // we will not be checking certificate names,
        // thus any certificate signed by a recognized CA
        // would be accepted if presented by an injector.
        //inj_ctx.set_default_verify_paths();
147
        inj_ctx.set_verify_mode(asio::ssl::verify_peer);
148
    }
Peter Jankuliak's avatar
Peter Jankuliak committed
149

150
    void start();
151

152
    void stop() {
153
        _bep5_http_cache = nullptr;
Peter Jankuliak's avatar
Peter Jankuliak committed
154
        _upnps.clear();
155
        _shutdown_signal();
156
        if (_injector) _injector->stop();
Peter Jankuliak's avatar
Peter Jankuliak committed
157
158
159
160
        if (_bt_dht) {
            _bt_dht->stop();
            _bt_dht = nullptr;
        }
161
162
163
164
        if (_udp_reachability) {
            _udp_reachability->stop();
            _udp_reachability = nullptr;
        }
165
166
    }

167
    void setup_cache();
168
    void set_injector(string);
Peter Jankuliak's avatar
Peter Jankuliak committed
169

170
171
172
173
174
    const asio_utp::udp_multiplexer& common_udp_multiplexer()
    {
        if (_udp_multiplexer) return *_udp_multiplexer;

        _udp_multiplexer
175
            = create_udp_multiplexer( _ctx
176
177
                                    , _config.repo_root() / "last_used_udp_port");

178
        _udp_reachability
179
180
            = make_unique<util::UdpServerReachabilityAnalysis>();
        _udp_reachability->start(get_executor(), *_udp_multiplexer);
181

182
183
184
        return *_udp_multiplexer;
    }

185
    std::shared_ptr<bt::MainlineDht> bittorrent_dht(asio::yield_context yield)
186
187
188
    {
        if (_bt_dht) return _bt_dht;

189
190
191
192
        auto bt_dht = make_shared<bt::MainlineDht>( _ctx.get_executor()
                                                  , _config.repo_root() / "dht");

        auto& mpl = common_udp_multiplexer();
193

194
        asio_utp::udp_multiplexer m(_ctx);
195
196
197
198
199
        sys::error_code ec;

        m.bind(mpl, ec);
        if (ec) return or_throw(yield, ec, _bt_dht);

Peter Jankuliak's avatar
Peter Jankuliak committed
200
201
        auto cc = _shutdown_signal.connect([&] { bt_dht.reset(); });

202
203
        auto ext_ep = bt_dht->set_endpoint(move(m), yield[ec]);
        if (ec) return or_throw(yield, ec, _bt_dht);
204

205
        setup_upnp(ext_ep.port(), mpl.local_endpoint());
206

207
        _bt_dht = move(bt_dht);
208
209
210
        return _bt_dht;
    }

211
212
213
    http::response<http::string_body>
    retrieval_failure_response(const Request&);

Peter Jankuliak's avatar
Peter Jankuliak committed
214
private:
215
216
217
    GenericStream ssl_mitm_handshake( GenericStream&&
                                    , const Request&
                                    , asio::yield_context);
218

219
    void serve_request(GenericStream&& con, asio::yield_context yield);
Peter Jankuliak's avatar
Peter Jankuliak committed
220

221
222
223
    // All `fetch_*` functions below take care of keeping or dropping
    // Ouinet-specific internal HTTP headers as expected by upper layers.

224
    CacheEntry
Peter Jankuliak's avatar
Peter Jankuliak committed
225
    fetch_stored_in_dcache( const Request& request
226
                          , const request_route::Config& request_config
Peter Jankuliak's avatar
Peter Jankuliak committed
227
228
229
                          , const std::string& dht_group
                          , Cancel& cancel
                          , Yield yield);
Peter Jankuliak's avatar
Peter Jankuliak committed
230

231
    Response fetch_fresh_from_front_end(const Request&, Yield);
232
    Session fetch_fresh_from_origin(const Request&, Cancel, Yield);
233

234
    Session fetch_fresh_through_connect_proxy(const Request&, Cancel&, Yield);
235

236
237
238
239
    Session fetch_fresh_through_simple_proxy( Request
                                            , bool can_inject
                                            , Cancel& cancel
                                            , Yield);
240

241
242
    template<class Resp>
    void maybe_add_proto_version_warning(Resp& res) const {
243
244
245
246
247
248
        auto newest = newest_proto_seen;
        // Check if cache client knows about a newer protocol version too.
        auto c = get_cache();
        if (c && c->get_newest_proto_version() > newest)
            newest = c->get_newest_proto_version();
        if (newest > http_::protocol_version_current)
249
            res.set( http_::response_warning_hdr
250
251
252
253
                   , "Newer Ouinet protocol found in network, "
                     "please consider upgrading.");
    };

Peter Jankuliak's avatar
Peter Jankuliak committed
254
255
    CacheControl build_cache_control(request_route::Config& request_config);

256
257
    void listen_tcp( asio::yield_context
                   , tcp::endpoint
258
                   , const char* service
259
                   , function<void(GenericStream, asio::yield_context)>);
260

261
    void setup_injector(asio::yield_context);
Peter Jankuliak's avatar
Peter Jankuliak committed
262

263
264
265
266
    bool was_stopped() const {
        return _shutdown_signal.call_count() != 0;
    }

267
268
269
    fs::path ca_cert_path() const { return _config.repo_root() / OUINET_CA_CERT_FILE; }
    fs::path ca_key_path()  const { return _config.repo_root() / OUINET_CA_KEY_FILE;  }
    fs::path ca_dh_path()   const { return _config.repo_root() / OUINET_CA_DH_FILE;   }
270

271
272
273
    asio::io_context& get_io_context() { return _ctx; }
    asio::executor get_executor() { return _ctx.get_executor(); }

274
    Signal<void()>& get_shutdown_signal() { return _shutdown_signal; }
275

276
277
278
279
    bool maybe_handle_websocket_upgrade( GenericStream&
                                       , beast::string_view connect_host_port
                                       , Request&
                                       , Yield);
280

281
282
    GenericStream connect_to_origin(const Request&, Cancel&, Yield);

283
284
285
    unique_ptr<OuiServiceImplementationClient>
    maybe_wrap_tls(unique_ptr<OuiServiceImplementationClient>);

Peter Jankuliak's avatar
Peter Jankuliak committed
286
    cache::bep5_http::Client* get_cache() const { return _bep5_http_cache.get(); }
287

288
    void serve_utp_request(GenericStream, Yield);
289

290
    void setup_upnp(uint16_t ext_port, asio::ip::udp::endpoint local_ep) {
Peter Jankuliak's avatar
Peter Jankuliak committed
291
292
        if (_shutdown_signal) return;

293
294
295
296
        if (!local_ep.address().is_v4()) {
            LOG_WARN("Not setting up UPnP redirection because endpoint is not ipv4");
            return;
        }
297

298
299
300
301
302
        auto& p = _upnps[local_ep];

        if (p) {
            LOG_WARN("UPnP redirection for ", local_ep, " is already set");
            return;
303
        }
304
305

        p = make_unique<UPnPUpdater>(_ctx.get_executor(), ext_port, local_ep.port());
306
307
    }

308
    void idempotent_start_accepting_on_utp(asio::yield_context yield) {
309
        if (_multi_utp_server) return;
310
        assert(!_shutdown_signal);
Peter Jankuliak's avatar
Peter Jankuliak committed
311
        if (_shutdown_signal) return or_throw(yield, asio::error::operation_aborted);
312

313
314
315
        sys::error_code ec;
        auto dht = bittorrent_dht(yield[ec]);
        if (ec) return or_throw(yield, ec);
316
317
318
319

        auto exec = _ctx.get_executor();
        auto local_eps = dht->local_endpoints();

320
        _multi_utp_server
321
322
            = make_unique<ouiservice::MultiUtpServer>(exec, local_eps, nullptr);

323
        TRACK_SPAWN(_ctx, ([&, c = _shutdown_signal] (asio::yield_context yield) mutable {
324
            auto slot = c.connect([&] () mutable { _multi_utp_server = nullptr; });
325

326
327
328
329
330
331
332
333
            sys::error_code ec;
            _multi_utp_server->start_listen(yield[ec]);

            if (ec) {
                LOG_ERROR("Failed to start accepting on multi uTP service: ", ec.message());
                return;
            }

334
335
            while (!c) {
                sys::error_code ec;
336
                auto con = _multi_utp_server->accept(yield[ec]);
337
338
339
340
341
342
343
                if (c) return;
                if (ec == asio::error::operation_aborted) return;
                if (ec) {
                    LOG_WARN("Bep5Http: Failure to accept:", ec.message());
                    async_sleep(_ctx, 200ms, c, yield);
                    continue;
                }
344
345
                TRACK_SPAWN(_ctx, ([&, con = move(con)]
                                   (asio::yield_context yield) mutable {
346
347
                    Yield y(_ctx, yield, "uTPAccept");
                    serve_utp_request(move(con), y[ec]);
348
                }));
349
            }
350
        }));
351
352
    }

Peter Jankuliak's avatar
Peter Jankuliak committed
353
private:
354
355
356
357
    // The newest protocol version number seen in a trusted exchange
    // (i.e. from an injector exchange or injector-signed cached content).
    unsigned newest_proto_seen = http_::protocol_version_current;

358
    asio::io_context& _ctx;
359
    ClientConfig _config;
360
    std::unique_ptr<CACertificate> _ca_certificate;
361
    util::LruCache<string, string> _ssl_certificate_cache;
362
    std::unique_ptr<OuiServiceClient> _injector;
363
    std::unique_ptr<cache::bep5_http::Client> _bep5_http_cache;
364

Peter Jankuliak's avatar
Peter Jankuliak committed
365
366
    ClientFrontEnd _front_end;
    Signal<void()> _shutdown_signal;
Peter Jankuliak's avatar
Peter Jankuliak committed
367

368
369
    // For debugging
    uint64_t _next_connection_id = 0;
370
    ConnectionPool<Endpoint> _injector_connections;
371
    OriginPools _origin_pools;
372
373

    asio::ssl::context ssl_ctx;
374
    asio::ssl::context inj_ctx;
375

376
    boost::optional<asio::ip::udp::endpoint> _local_utp_endpoint;
377
    boost::optional<asio_utp::udp_multiplexer> _udp_multiplexer;
378
    unique_ptr<util::UdpServerReachabilityAnalysis> _udp_reachability;
379
    shared_ptr<bt::MainlineDht> _bt_dht;
380

381
    unique_ptr<ouiservice::MultiUtpServer> _multi_utp_server;
382
    shared_ptr<ouiservice::Bep5Client> _bep5_client;
383
384

    std::map<asio::ip::udp::endpoint, unique_ptr<UPnPUpdater>> _upnps;
385
};
Peter Jankuliak's avatar
Peter Jankuliak committed
386
387

//------------------------------------------------------------------------------
388
template<class Resp>
Peter Jankuliak's avatar
Peter Jankuliak committed
389
static
390
void handle_http_error( GenericStream& con
391
                      , Resp& res
392
                      , Yield yield)
Peter Jankuliak's avatar
Peter Jankuliak committed
393
{
394
395
396
397
    if (log_transactions()) {
        yield.log("=== Sending back response ===");
        yield.log(res);
    }
398

399
    http::async_write(con, res, yield);
Peter Jankuliak's avatar
Peter Jankuliak committed
400
401
}

402
template<class ReqBody>
403
404
static
void handle_bad_request( GenericStream& con
405
                       , const http::request<ReqBody>& req
406
                       , const string& message
407
408
                       , Yield yield)
{
409
    auto res = util::http_client_error(req, http::status::bad_request, "", message);
410
    auto yield_ = yield.tag("handle_bad_request");
411
    return handle_http_error(con, res, yield_);
412
413
}

414
415
//------------------------------------------------------------------------------
void
416
Client::State::serve_utp_request(GenericStream con, Yield yield)
417
418
419
420
421
422
423
424
{
    assert(_bep5_http_cache);
    Cancel cancel = _shutdown_signal;

    sys::error_code ec;

    http::request<http::empty_body> req;
    beast::flat_buffer buffer;
425
426
427
428
429
430
431
432
433
434

    {
        WatchDog watch_dog(_ctx , chrono::seconds(5) , [&] { con.close(); });

        http::async_read(con, buffer, req, yield[ec].tag("read"));

        if (!watch_dog.is_running()) {
            return or_throw(yield, asio::error::timed_out);
        }
    }
435
436
437

    if (ec || cancel) return;

438
439
440
441
442
443
444
    if (req.method() != http::verb::connect) {
        _bep5_http_cache->serve_local(req, con, cancel, yield[ec]);
        return;
    }

    // Connect to the injector and tunnel the transaction through it

445
    if (!_bep5_client) {
446
447
448
        return handle_bad_request(con, req, "No known injectors", yield[ec]);
    }

449
450
451
452
    auto inj = _bep5_client->connect( yield[ec].tag("connect_to_injector")
                                    , cancel
                                    , false
                                    , ouiservice::Bep5Client::injectors);
453
454
455
456
457
458
459
460
461
462
463

    if (cancel) ec = asio::error::operation_aborted;
    if (ec == asio::error::operation_aborted) return;
    if (ec) {
        ec = {};
        return handle_bad_request(con, req, "Failed to connect to injector", yield[ec]);
    }

    // Send the client an OK message indicating that the tunnel
    // has been established.
    http::response<http::empty_body> res{http::status::ok, req.version()};
464
465
    res.prepare_payload();

466
467
468
    // No ``res.prepare_payload()`` since no payload is allowed for CONNECT:
    // <https://tools.ietf.org/html/rfc7231#section-6.3.1>.

469
    http::async_write(con, res, yield[ec].tag("write"));
470
471
472
473

    if (cancel) ec = asio::error::operation_aborted;
    if (ec) return;

474
    full_duplex(move(con), move(inj), yield[ec].tag("full_duplex"));
475
476
}

Peter Jankuliak's avatar
Peter Jankuliak committed
477
//------------------------------------------------------------------------------
478
CacheEntry
Peter Jankuliak's avatar
Peter Jankuliak committed
479
Client::State::fetch_stored_in_dcache( const Request& request
480
                                     , const request_route::Config& request_config
Peter Jankuliak's avatar
Peter Jankuliak committed
481
482
483
                                     , const std::string& dht_group
                                     , Cancel& cancel
                                     , Yield yield)
Peter Jankuliak's avatar
Peter Jankuliak committed
484
{
485
    auto c = get_cache();
486

Peter Jankuliak's avatar
Peter Jankuliak committed
487
    const bool cache_is_disabled
488
        = !c
489
       || !_config.is_cache_access_enabled();
Peter Jankuliak's avatar
Peter Jankuliak committed
490
491

    if (cache_is_disabled) {
492
493
        return or_throw<CacheEntry>( yield
                                   , asio::error::operation_not_supported);
494
495
    }

496
    sys::error_code ec;
497

498
    auto key = key_from_http_req(request);
499
    assert(key);
500
    auto s = c->load(move(*key), dht_group, cancel, yield[ec]);
501
    return_or_throw_on_error(yield, cancel, ec, CacheEntry{});
502

503
    auto& hdr = s.response_header();
504

505
    if (!util::http_proto_version_check_trusted(hdr, newest_proto_seen))
506
507
508
        // The cached resource cannot be used, treat it like
        // not being found.
        return or_throw<CacheEntry>(yield, asio::error::not_found);
509

510
    auto tsh = util::http_injection_ts(hdr);
511
512
513
514
    auto ts = parse::number<time_t>(tsh);
    auto date = ( ts
                ? boost::posix_time::from_time_t(*ts)
                : boost::posix_time::not_a_date_time);
515

516
    maybe_add_proto_version_warning(hdr);
517
    assert(!hdr[http_::response_source_hdr].empty());  // for agent, set by cache
518
    return CacheEntry{date, move(s)};
519
520
}

521
//------------------------------------------------------------------------------
522
523
524
525
526
527
528
529
530
531
532
GenericStream
Client::State::connect_to_origin( const Request& rq
                                , Cancel& cancel
                                , Yield yield)
{
    std::string host, port;
    std::tie(host, port) = util::get_host_port(rq);

    sys::error_code ec;

    auto lookup = util::tcp_async_resolve( host, port
533
                                         , _ctx.get_executor()
534
535
536
                                         , cancel
                                         , yield[ec]);

537
    return_or_throw_on_error(yield, cancel, ec, GenericStream());
538

539
    auto sock = connect_to_host(lookup, _ctx.get_executor(), cancel, yield[ec]);
540

541
    return_or_throw_on_error(yield, cancel, ec, GenericStream());
542
543
544

    GenericStream stream;

545
    if (rq.target().starts_with("https:") || rq.target().starts_with("wss:")) {
546
547
548
549
550
551
        stream = ssl::util::client_handshake( move(sock)
                                            , ssl_ctx
                                            , host
                                            , cancel
                                            , yield[ec]);

552
        return_or_throw_on_error(yield, cancel, ec, GenericStream());
553
554
555
556
557
558
559
560
    }
    else {
        stream = move(sock);
    }

    return stream;
}
//------------------------------------------------------------------------------
561
562
Response Client::State::fetch_fresh_from_front_end(const Request& rq, Yield yield)
{
563
564
565
566
567
568
    boost::optional<uint32_t> udp_port;

    if (_udp_multiplexer) {
        udp_port = _udp_multiplexer->local_endpoint().port();
    }

569
570
571
572
573
574
575
576
577
578
579
    auto res = _front_end.serve( _config
                               , rq
                               , _bep5_http_cache.get()
                               , *_ca_certificate
                               , udp_port
                               , _upnps
                               , _udp_reachability.get()
                               , yield.tag("serve_frontend"));

    res.set( http_::response_source_hdr  // for agent
           , http_::response_source_hdr_front_end);
580
581
582

    res.keep_alive(rq.keep_alive());

583
    return res;
584
}
585

586
//------------------------------------------------------------------------------
587
Session Client::State::fetch_fresh_from_origin(const Request& rq, Cancel cancel, Yield yield)
588
{
589
    WatchDog watch_dog(_ctx
590
591
592
                      , default_timeout::fetch_http()
                      , [&] { cancel(); });

593
    sys::error_code ec;
594

595
596
597
598
599
    auto maybe_con = _origin_pools.get_connection(rq);
    OriginPools::Connection con;
    if (maybe_con) {
        con = std::move(*maybe_con);
    } else {
600
        auto stream = connect_to_origin(rq, cancel, yield[ec]);
601

602
603
604
605
606
607
608
        if (cancel) {
            assert(ec == asio::error::operation_aborted);
            ec = watch_dog.is_running()
               ? ec = asio::error::operation_aborted
               : ec = asio::error::timed_out;
        }

609
        if (ec) return or_throw<Session>(yield, ec);
610

611
        con = _origin_pools.wrap(rq, std::move(stream));
612
613
614
615
    }

    // Transform request from absolute-form to origin-form
    // https://tools.ietf.org/html/rfc7230#section-5.3
616
    auto rq_ = util::req_form_from_absolute_to_origin(rq);
617

618
    // Send request
619
620
621
622
    {
        auto con_close = cancel.connect([&] { con.close(); });
        http::async_write(con, rq_, yield[ec].tag("send-origin-request"));
    }
623

624
625
626
627
    if (cancel) {
        ec = watch_dog.is_running() ? ec = asio::error::operation_aborted
                                    : ec = asio::error::timed_out;
    }
628
    if (ec) return or_throw<Session>(yield, ec);
629

630
631
    auto ret = Session::create(std::move(con), cancel, yield[ec]);
    return_or_throw_on_error(yield, cancel, ec, Session());
632

633
    // Prevent others from inserting ouinet headers.
634
    util::remove_ouinet_fields_ref(ret.response_header());
635

636
637
638
    ret.response_header().set( http_::response_source_hdr  // for agent
                             , http_::response_source_hdr_origin);
    return ret;
639
640
}

641
//------------------------------------------------------------------------------
642
643
644
Session Client::State::fetch_fresh_through_connect_proxy( const Request& rq
                                                        , Cancel& cancel_
                                                        , Yield yield)
645
{
646
647
648
649
    // TODO: We're not re-using connections here. It's because the
    // ConnectionPool as it is right now can only work with http requests
    // and responses and thus can't be used for full-dupplex forwarding.

650
    Cancel cancel(cancel_);
651
    WatchDog watch_dog(_ctx, default_timeout::fetch_http(), [&]{ cancel(); });
652
653
654
655
656
657

    // Parse the URL to tell HTTP/HTTPS, host, port.
    util::url_match url;

    if (!match_http_url(rq.target(), url)) {
        // unsupported URL
658
        return or_throw<Session>(yield, asio::error::operation_not_supported);
659
660
661
662
    }

    // Connect to the injector/proxy.
    sys::error_code ec;
663

664
    auto inj = _injector->connect(yield[ec].tag("connect_to_injector"), cancel);
665

666
    if (ec) return or_throw<Session>(yield, ec);
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686

    // Build the actual request to send to the proxy.
    Request connreq = { http::verb::connect
                      , url.host + ":" + (url.port.empty() ? "443" : url.port)
                      , 11 /* HTTP/1.1 */};

    // HTTP/1.1 requires a ``Host:`` header in all requests:
    // <https://tools.ietf.org/html/rfc7230#section-5.4>.
    connreq.set(http::field::host, connreq.target());

    if (auto credentials = _config.credentials_for(inj.remote_endpoint))
        connreq = authorize(connreq, *credentials);

    // Open a tunnel to the origin
    // (to later perform the SSL handshake and send the request).
    // Only get the head of the CONNECT response
    // (otherwise we would get stuck waiting to read
    // a body whose length we do not know
    // since the respone should have no content length).
    auto connres = fetch_http<http::empty_body>
687
                        ( _ctx.get_executor()
688
689
690
691
692
693
694
695
696
697
                        , inj.connection
                        , connreq
                        , default_timeout::fetch_http()
                        , cancel
                        , yield[ec].tag("connreq"));

    if (connres.result() != http::status::ok) {
        // This error code is quite fake, so log the error too.
        // Unfortunately there is no body to show.
        yield.tag("proxy_connect").log(connres);
698
        return or_throw<Session>(yield, asio::error::connection_refused);
699
700
701
702
703
704
705
706
707
708
709
710
711
712
    }

    GenericStream con;

    if (url.scheme == "https") {
        con = ssl::util::client_handshake( move(inj.connection)
                                         , ssl_ctx
                                         , url.host
                                         , cancel
                                         , yield[ec]);
    } else {
        con = move(inj.connection);
    }

713
    if (ec) return or_throw<Session>(yield, ec);
714
715

    // TODO: move
716
    auto rq_ = util::req_form_from_absolute_to_origin(rq);
717

718
719
720
721
    {
        auto slot = cancel.connect([&con] { con.close(); });
        http::async_write(con, rq_, yield[ec]);
        return_or_throw_on_error(yield, cancel, ec, Session());
722
    }
723

724
    auto session = Session::create(move(con), cancel, yield[ec]);
725
726
727
    return_or_throw_on_error(yield, cancel, ec, Session());

    // Prevent others from inserting ouinet headers.
728
    util::remove_ouinet_fields_ref(session.response_header());
729

730
731
    session.response_header().set( http_::response_source_hdr  // for agent
                                 , http_::response_source_hdr_proxy);
732
    return session;
733
}
734
//------------------------------------------------------------------------------
735
Session Client::State::fetch_fresh_through_simple_proxy
736
        ( Request request
737
        , bool can_inject
738
        , Cancel& cancel
739
        , Yield yield)
740
{
741
    sys::error_code ec;
742

743
    // Connect to the injector.
744
    ConnectionPool<Endpoint>::Connection con;
745
    if (_injector_connections.empty()) {
Peter Jankuliak's avatar
Peter Jankuliak committed
746
747
748
749
        if (log_transactions()) {
            yield.log("Connecting to the injector");
        }

750
        auto c = _injector->connect(yield[ec].tag("connect_to_injector2"), cancel);
751

Peter Jankuliak's avatar
Peter Jankuliak committed
752
753
754
755
756
757
758
759
760
761
        assert(!cancel || ec == asio::error::operation_aborted);

        if (ec) {
            if (log_transactions()) {
                yield.log("Failed to connect to injector ec:", ec.message());
            }
            return or_throw<Session>(yield, ec);
        }

        assert(c.connection.has_implementation());
762

763
764
765
        con = _injector_connections.wrap(std::move(c.connection));
        *con = c.remote_endpoint;
    } else {
Peter Jankuliak's avatar
Peter Jankuliak committed
766
767
768
769
        if (log_transactions()) {
            yield.log("Reusing existing injector connection");
        }

770
        con = _injector_connections.pop_front();
771
    }
772

773
774
775
776
    auto cancel_slot = cancel.connect([&] {
        con.close();
    });

777
    // Build the actual request to send to the injector.
778
779
    if (auto credentials = _config.credentials_for(*con))
        request = authorize(request, *credentials);
780

781
782
783
784
785
    if (can_inject) {
        bool keepalive = request.keep_alive();
        request = util::to_injector_request(move(request));
        request.keep_alive(keepalive);
    }
786

Peter Jankuliak's avatar
Peter Jankuliak committed
787
788
789
    if (log_transactions()) {
        yield.log("Sending a request to the injector");
    }
790
791
    // Send request
    http::async_write(con, request, yield[ec].tag("inj-request"));
792

793
794
795
    if (!ec && cancel_slot) {
        ec = asio::error::operation_aborted;
    }
Peter Jankuliak's avatar
Peter Jankuliak committed
796
797
798
799
800

    if (ec && log_transactions()) {
        yield.log("Failed to send request to the injector");
    }

801
    if (ec) return or_throw<Session>(yield, ec);
802

Peter Jankuliak's avatar
Peter Jankuliak committed
803
804
805
806
    if (log_transactions()) {
        yield.log("Reading response");
    }

807
808
809
810
811
812
    cancel_slot = {};

    // Receive response
    auto session = Session::create(move(con), cancel, yield[ec]);

    auto& hdr = session.response_header();
813

814
    if (cancel)
815
816
        ec = asio::error::operation_aborted;
    else if ( !ec
817
            && can_inject
818
            && !util::http_proto_version_check_trusted(hdr, newest_proto_seen))
819
820
821
        // The injector using an unacceptable protocol version is treated like
        // the Injector mechanism being disabled.
        ec = asio::error::operation_not_supported;
Peter Jankuliak's avatar
Peter Jankuliak committed
822
823
824
825
826

    if (log_transactions()) {
        yield.log("End reading response. ec:", ec.message());
    }

827
    if (ec) return or_throw(yield, ec, std::move(session));
828

829
    // Store keep-alive connections in connection pool
830

831
    if (can_inject) {
832
        maybe_add_proto_version_warning(hdr);
833
834
835

        session.response_header().set( http_::response_source_hdr  // for agent
                                     , http_::response_source_hdr_injector);
836
    } else {
837
        // Prevent others from inserting ouinet headers.
838
        util::remove_ouinet_fields_ref(hdr);
839

840
841
842
        session.response_header().set( http_::response_source_hdr  // for agent
                                     , http_::response_source_hdr_proxy);
    }
843
    return session;
844
845
}

846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
class Transaction {
public:
    Transaction(GenericStream& ua_con, const Request& rq, UserAgentMetaData meta)
        : _ua_con(ua_con)
        , _request(rq)
        , _meta(std::move(meta))
    {}

    void write_to_user_agent(Session& session, Cancel& cancel, asio::yield_context yield)
    {
        namespace err = asio::error;

        if (cancel) {
            assert(!cancel);
            LOG_ERROR(__FILE__, ":", __LINE__, " Cancel already called");
            return or_throw(yield, err::operation_aborted);
        }

864
        if (_ua_was_written_to) {
865
866
867
868
869
            return or_throw(yield, err::already_started);
        }

        sys::error_code ec;

870
        _ua_was_written_to = true;
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
        session.flush_response(_ua_con, cancel, yield[ec]);

        bool keep_alive = !ec && _request.keep_alive() && session.keep_alive();

        if (!keep_alive) {
            session.close();
            _ua_con.close();
        }

        return or_throw(yield, ec);
    }

    template<class BodyType>
    void write_to_user_agent(const http::response<BodyType>& rs, Cancel& cancel, asio::yield_context yield)
    {
        namespace err = asio::error;