Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/MainDistributionPipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ concurrency:
jobs:
duckdb-stable-build:
name: Build extension binaries
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.4.3
with:
extension_name: httpfs
duckdb_version: v1.4.2
ci_tools_version: main
duckdb_version: v1.4.3
ci_tools_version: v1.4.3


duckdb-stable-deploy:
name: Deploy extension binaries
needs: duckdb-stable-build
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.4.3
secrets: inherit
with:
extension_name: httpfs
duckdb_version: v1.4.2
ci_tools_version: main
duckdb_version: v1.4.3
ci_tools_version: v1.4.3
deploy_latest: ${{ startsWith(github.ref, 'refs/heads/v') }}
deploy_versioned: ${{ startsWith(github.ref, 'refs/heads/v') || github.ref == 'refs/heads/main' }}
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 174 files
14 changes: 14 additions & 0 deletions src/create_secret_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ unique_ptr<BaseSecret> CreateS3SecretFunctions::CreateSecretFunctionInternal(Cli
secret->secret_map["bearer_token"] = named_param.second.ToString();
// Mark it as sensitive
secret->redact_keys.insert("bearer_token");
} else if (lower_name == "http_proxy") {
secret->secret_map["http_proxy"] = named_param.second;
} else if (lower_name == "http_proxy_password") {
secret->secret_map["http_proxy_password"] = named_param.second;
} else if (lower_name == "http_proxy_username") {
secret->secret_map["http_proxy_username"] = named_param.second;
} else if (lower_name == "extra_http_headers") {
secret->secret_map["extra_http_headers"] = named_param.second;
} else {
throw InvalidInputException("Unknown named parameter passed to CreateSecretFunctionInternal: " +
lower_name);
Expand Down Expand Up @@ -200,6 +208,12 @@ void CreateS3SecretFunctions::SetBaseNamedParams(CreateSecretFunction &function,
// Whether a secret refresh attempt should be made when the secret appears to be incorrect
function.named_parameters["refresh"] = LogicalType::VARCHAR;

// Params for HTTP configuration
function.named_parameters["http_proxy"] = LogicalType::VARCHAR;
function.named_parameters["http_proxy_password"] = LogicalType::VARCHAR;
function.named_parameters["http_proxy_username"] = LogicalType::VARCHAR;
function.named_parameters["extra_http_headers"] = LogicalType::MAP(LogicalType::VARCHAR, LogicalType::VARCHAR);

// Refresh Modes
// - auto
// - disabled
Expand Down
114 changes: 99 additions & 15 deletions src/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <string>
#include <thread>

#include "s3fs.hpp"

namespace duckdb {

shared_ptr<HTTPUtil> HTTPFSUtil::GetHTTPUtil(optional_ptr<FileOpener> opener) {
Expand All @@ -34,7 +36,7 @@ unique_ptr<HTTPParams> HTTPFSUtil::InitializeParameters(optional_ptr<FileOpener>
auto result = make_uniq<HTTPFSParams>(*this);
result->Initialize(opener);

// No point in continueing without an opener
// No point in continuing without an opener
if (!opener) {
return std::move(result);
}
Expand Down Expand Up @@ -65,23 +67,41 @@ unique_ptr<HTTPParams> HTTPFSUtil::InitializeParameters(optional_ptr<FileOpener>
}
}

unique_ptr<KeyValueSecretReader> settings_reader;
if (info && !S3FileSystem::TryGetPrefix(info->file_path).empty()) {
// This is an S3-type url, we should
const char *s3_secret_types[] = {"s3", "r2", "gcs", "aws", "http"};

idx_t secret_type_count = 5;
Value merge_http_secret_into_s3_request;
FileOpener::TryGetCurrentSetting(opener, "merge_http_secret_into_s3_request",
merge_http_secret_into_s3_request);

if (!merge_http_secret_into_s3_request.IsNull() && !merge_http_secret_into_s3_request.GetValue<bool>()) {
// Drop the http secret from the lookup
secret_type_count = 4;
}
settings_reader = make_uniq<KeyValueSecretReader>(*opener, info, s3_secret_types, secret_type_count);
} else {
settings_reader = make_uniq<KeyValueSecretReader>(*opener, info, "http");
}

// HTTP Secret lookups
KeyValueSecretReader settings_reader(*opener, info, "http");

string proxy_setting;
if (settings_reader.TryGetSecretKey<string>("http_proxy", proxy_setting) && !proxy_setting.empty()) {
if (settings_reader->TryGetSecretKey<string>("http_proxy", proxy_setting) && !proxy_setting.empty()) {
idx_t port;
string host;
HTTPUtil::ParseHTTPProxyHost(proxy_setting, host, port);
result->http_proxy = host;
result->http_proxy_port = port;
}
settings_reader.TryGetSecretKey<string>("http_proxy_username", result->http_proxy_username);
settings_reader.TryGetSecretKey<string>("http_proxy_password", result->http_proxy_password);
settings_reader.TryGetSecretKey<string>("bearer_token", result->bearer_token);
settings_reader->TryGetSecretKey<string>("http_proxy_username", result->http_proxy_username);
settings_reader->TryGetSecretKey<string>("http_proxy_password", result->http_proxy_password);
settings_reader->TryGetSecretKey<string>("bearer_token", result->bearer_token);

Value extra_headers;
if (settings_reader.TryGetSecretKey("extra_http_headers", extra_headers)) {
if (settings_reader->TryGetSecretKey("extra_http_headers", extra_headers)) {
auto children = MapValue::GetChildren(extra_headers);
for (const auto &child : children) {
auto kv = StructValue::GetChildren(child);
Expand Down Expand Up @@ -115,13 +135,22 @@ static void AddUserAgentIfAvailable(HTTPFSParams &http_params, HTTPHeaders &head
}
}

static void AddHandleHeaders(HTTPFileHandle &handle, HTTPHeaders &header_map) {
// Inject headers from the http param extra_headers into the request
for (auto &header : handle.http_params.extra_headers) {
header_map[header.first] = header.second;
}
handle.http_params.pre_merged_headers = true;
}

unique_ptr<HTTPResponse> HTTPFileSystem::PostRequest(FileHandle &handle, string url, HTTPHeaders header_map,
string &buffer_out, char *buffer_in, idx_t buffer_in_len,
string params) {
auto &hfh = handle.Cast<HTTPFileHandle>();
auto &http_util = hfh.http_params.http_util;

AddUserAgentIfAvailable(hfh.http_params, header_map);
AddHandleHeaders(hfh, header_map);

PostRequestInfo post_request(url, header_map, hfh.http_params, const_data_ptr_cast(buffer_in), buffer_in_len);
auto result = http_util.Request(post_request);
Expand All @@ -135,6 +164,7 @@ unique_ptr<HTTPResponse> HTTPFileSystem::PutRequest(FileHandle &handle, string u
auto &http_util = hfh.http_params.http_util;

AddUserAgentIfAvailable(hfh.http_params, header_map);
AddHandleHeaders(hfh, header_map);

string content_type = "application/octet-stream";
PutRequestInfo put_request(url, header_map, hfh.http_params, (const_data_ptr_t)buffer_in, buffer_in_len,
Expand All @@ -147,6 +177,7 @@ unique_ptr<HTTPResponse> HTTPFileSystem::HeadRequest(FileHandle &handle, string
auto &http_util = hfh.http_params.http_util;

AddUserAgentIfAvailable(hfh.http_params, header_map);
AddHandleHeaders(hfh, header_map);

auto http_client = hfh.GetClient();

Expand All @@ -162,6 +193,7 @@ unique_ptr<HTTPResponse> HTTPFileSystem::DeleteRequest(FileHandle &handle, strin
auto &http_util = hfh.http_params.http_util;

AddUserAgentIfAvailable(hfh.http_params, header_map);
AddHandleHeaders(hfh, header_map);

auto http_client = hfh.GetClient();
DeleteRequestInfo delete_request(url, header_map, hfh.http_params);
Expand All @@ -187,6 +219,7 @@ unique_ptr<HTTPResponse> HTTPFileSystem::GetRequest(FileHandle &handle, string u
auto &http_util = hfh.http_params.http_util;

AddUserAgentIfAvailable(hfh.http_params, header_map);
AddHandleHeaders(hfh, header_map);

D_ASSERT(hfh.cached_file_handle);

Expand Down Expand Up @@ -238,6 +271,7 @@ unique_ptr<HTTPResponse> HTTPFileSystem::GetRangeRequest(FileHandle &handle, str
auto &http_util = hfh.http_params.http_util;

AddUserAgentIfAvailable(hfh.http_params, header_map);
AddHandleHeaders(hfh, header_map);

// send the Range header to read only subset of file
string range_expr = "bytes=" + to_string(file_offset) + "-" + to_string(file_offset + buffer_out_len - 1);
Expand Down Expand Up @@ -399,14 +433,49 @@ unique_ptr<FileHandle> HTTPFileSystem::OpenFileExtended(const OpenFileInfo &file
return std::move(handle);
}

void HTTPFileHandle::AddStatistics(idx_t read_offset, idx_t read_length, idx_t read_duration) {
range_request_statistics.push_back({read_offset, read_length, read_duration});
}

void HTTPFileHandle::AdaptReadBufferSize(idx_t next_read_offset) {
D_ASSERT(!SkipBuffer());
if (range_request_statistics.empty()) {
return; // No requests yet - nothing to do
}

const auto &last_read = range_request_statistics.back();
if (last_read.offset + last_read.length != next_read_offset) {
return; // Not reading sequentially
}

if (read_buffer.GetSize() >= MAXIMUM_READ_BUFFER_LEN) {
return; // Already at maximum size
}

// Grow the buffer
// TODO: can use statistics to estimate per-byte and round-trip cost using least squares, and do something smarter
read_buffer = read_buffer.GetAllocator()->Allocate(read_buffer.GetSize() * 2);
}

bool HTTPFileSystem::TryRangeRequest(FileHandle &handle, string url, HTTPHeaders header_map, idx_t file_offset,
char *buffer_out, idx_t buffer_out_len) {
auto &hfh = handle.Cast<HTTPFileHandle>();

const auto timestamp_before = Timestamp::GetCurrentTimestamp();
auto res = GetRangeRequest(handle, url, header_map, file_offset, buffer_out, buffer_out_len);

if (res) {
// Request succeeded TODO: fix upstream that 206 is not considered success
if (res->Success() || res->status == HTTPStatusCode::PartialContent_206 ||
res->status == HTTPStatusCode::Accepted_202) {

if (!hfh.flags.RequireParallelAccess()) {
// Update range request statistics
const auto duration =
NumericCast<idx_t>(Timestamp::GetCurrentTimestamp().value - timestamp_before.value);
hfh.AddStatistics(file_offset, buffer_out_len, duration);
}

return true;
}

Expand Down Expand Up @@ -438,6 +507,9 @@ bool HTTPFileSystem::ReadInternal(FileHandle &handle, void *buffer, int64_t nr_b
if (!hfh.cached_file_handle->Initialized()) {
throw InternalException("Cached file not initialized properly");
}
if (hfh.cached_file_handle->GetSize() < location + nr_bytes) {
throw InternalException("Cached file length can't satisfy the requested Read");
}
memcpy(buffer, hfh.cached_file_handle->GetData() + location, nr_bytes);
DUCKDB_LOG_FILE_SYSTEM_READ(handle, nr_bytes, location);
hfh.file_offset = location + nr_bytes;
Expand All @@ -448,8 +520,7 @@ bool HTTPFileSystem::ReadInternal(FileHandle &handle, void *buffer, int64_t nr_b
idx_t buffer_offset = 0;

// Don't buffer when DirectIO is set or when we are doing parallel reads
bool skip_buffer = hfh.flags.DirectIO() || hfh.flags.RequireParallelAccess();
if (skip_buffer && to_read > 0) {
if (hfh.SkipBuffer() && to_read > 0) {
if (!TryRangeRequest(hfh, hfh.path, {}, location, (char *)buffer, to_read)) {
return false;
}
Expand Down Expand Up @@ -494,7 +565,7 @@ bool HTTPFileSystem::ReadInternal(FileHandle &handle, void *buffer, int64_t nr_b
}

if (to_read > 0 && hfh.buffer_available == 0) {
auto new_buffer_available = MinValue<idx_t>(hfh.READ_BUFFER_LEN, hfh.length - start_offset);
auto new_buffer_available = MinValue<idx_t>(hfh.read_buffer.GetSize(), hfh.length - start_offset);

// Bypass buffer if we read more than buffer size
if (to_read > new_buffer_available) {
Expand All @@ -507,6 +578,8 @@ bool HTTPFileSystem::ReadInternal(FileHandle &handle, void *buffer, int64_t nr_b
start_offset += to_read;
break;
} else {
hfh.AdaptReadBufferSize(start_offset);
new_buffer_available = MinValue<idx_t>(hfh.read_buffer.GetSize(), hfh.length - start_offset);
if (!TryRangeRequest(hfh, hfh.path, {}, start_offset, (char *)hfh.read_buffer.get(),
new_buffer_available)) {
return false;
Expand Down Expand Up @@ -729,7 +802,8 @@ void HTTPFileHandle::LoadFileInfo() {
return;
} else {
// HEAD request fail, use Range request for another try (read only one byte)
if (flags.OpenForReading() && res->status != HTTPStatusCode::NotFound_404 && res->status != HTTPStatusCode::MovedPermanently_301) {
if (flags.OpenForReading() && res->status != HTTPStatusCode::NotFound_404 &&
res->status != HTTPStatusCode::MovedPermanently_301) {
auto range_res = hfs.GetRangeRequest(*this, path, {}, 0, nullptr, 2);
if (range_res->status != HTTPStatusCode::PartialContent_206 &&
range_res->status != HTTPStatusCode::Accepted_202 && range_res->status != HTTPStatusCode::OK_200) {
Expand Down Expand Up @@ -774,6 +848,14 @@ void HTTPFileHandle::TryAddLogger(FileOpener &opener) {
}
}

void HTTPFileHandle::AllocateReadBuffer(optional_ptr<FileOpener> opener) {
D_ASSERT(!SkipBuffer());
D_ASSERT(!read_buffer.IsSet());
auto &allocator = opener && opener->TryGetClientContext() ? BufferAllocator::Get(*opener->TryGetClientContext())
: Allocator::DefaultAllocator();
read_buffer = allocator.Allocate(INITIAL_READ_BUFFER_LEN);
}

void HTTPFileHandle::Initialize(optional_ptr<FileOpener> opener) {
auto &hfs = file_system.Cast<HTTPFileSystem>();
http_params.state = HTTPState::TryGetState(opener);
Expand Down Expand Up @@ -803,8 +885,8 @@ void HTTPFileHandle::Initialize(optional_ptr<FileOpener> opener) {
length = value.length;
etag = value.etag;

if (flags.OpenForReading()) {
read_buffer = duckdb::unique_ptr<data_t[]>(new data_t[READ_BUFFER_LEN]);
if (flags.OpenForReading() && !SkipBuffer()) {
AllocateReadBuffer(opener);
}
return;
}
Expand All @@ -822,8 +904,10 @@ void HTTPFileHandle::Initialize(optional_ptr<FileOpener> opener) {
current_cache->Insert(path, {length, last_modified, etag});
}

// Initialize the read buffer now that we know the file exists
read_buffer = duckdb::unique_ptr<data_t[]>(new data_t[READ_BUFFER_LEN]);
if (!SkipBuffer()) {
// Initialize the read buffer now that we know the file exists
AllocateReadBuffer(opener);
}
}

// If we're writing to a file, we might as well remove it from the cache
Expand Down
2 changes: 1 addition & 1 deletion src/httpfs_curl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class HTTPFSCurlClient : public HTTPClient {
Initialize(http_params);
}
void Initialize(HTTPParams &http_p) override {
HTTPFSParams &http_params = (HTTPFSParams&)http_p;
HTTPFSParams &http_params = (HTTPFSParams &)http_p;
auto bearer_token = "";
if (!http_params.bearer_token.empty()) {
bearer_token = http_params.bearer_token.c_str();
Expand Down
3 changes: 3 additions & 0 deletions src/httpfs_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ static void LoadInternal(ExtensionLoader &loader) {
config.AddExtensionOption("hf_max_per_page", "Debug option to limit number of items returned in list requests",
LogicalType::UBIGINT, Value::UBIGINT(0));

config.AddExtensionOption("merge_http_secret_into_s3_request", "Merges http secret params into S3 requests",
LogicalType::BOOLEAN, Value(true));

auto callback_httpfs_client_implementation = [](ClientContext &context, SetScope scope, Value &parameter) {
auto &config = DBConfig::GetConfig(context);
string value = StringValue::Get(parameter);
Expand Down
16 changes: 12 additions & 4 deletions src/httpfs_httplib_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class HTTPFSClient : public HTTPClient {
Initialize(http_params);
}
void Initialize(HTTPParams &http_p) override {
HTTPFSParams &http_params = (HTTPFSParams&)http_p;
HTTPFSParams &http_params = (HTTPFSParams &)http_p;
client->set_follow_location(http_params.follow_location);
client->set_keep_alive(http_params.keep_alive);
if (!http_params.ca_cert_file.empty()) {
Expand Down Expand Up @@ -106,18 +106,26 @@ class HTTPFSClient : public HTTPClient {
info.buffer_out += string(data, data_length);
return true;
};
// First assign body, this is the body that will be uploaded
req.body.assign(const_char_ptr_cast(info.buffer_in), info.buffer_in_len);
return TransformResult(client->send(req));
auto transformed_req = TransformResult(client->send(req));
// Then, after actual re-quest, re-assign body to the response value of the POST request
transformed_req->body.assign(const_char_ptr_cast(info.buffer_in), info.buffer_in_len);
return std::move(transformed_req);
}

private:
duckdb_httplib_openssl::Headers TransformHeaders(const HTTPHeaders &header_map, const HTTPParams &params) {
auto &httpfs_params = params.Cast<HTTPFSParams>();

duckdb_httplib_openssl::Headers headers;
for (auto &entry : header_map) {
headers.insert(entry);
}
for (auto &entry : params.extra_headers) {
headers.insert(entry);
if (!httpfs_params.pre_merged_headers) {
for (auto &entry : params.extra_headers) {
headers.insert(entry);
}
}
return headers;
}
Expand Down
Loading
Loading