diff --git a/README.md b/README.md index d8a86026..a225fe90 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![license](https://img.shields.io/github/license/couchbase/couchbase-ruby-client?color=brightgreen)](https://opensource.org/licenses/Apache-2.0) [![gem](https://img.shields.io/gem/v/couchbase?color=brightgreen)](https://rubygems.org/gems/couchbase) -[![commits](https://img.shields.io/github/commits-since/couchbase/couchbase-ruby-client/latest?color=brightgreen)](https://github.com/couchbase/couchbase-ruby-client/commits/master) +[![commits](https://img.shields.io/github/commits-since/couchbase/couchbase-ruby-client/latest?color=brightgreen)](https://github.com/couchbase/couchbase-ruby-client/commits/main) [![linters](https://img.shields.io/github/actions/workflow/status/couchbase/couchbase-ruby-client/linters.yml?branch=main&label=linters)](https://github.com/couchbase/couchbase-ruby-client/actions?query=workflow%3Alinters) This repository contains the third generation of the official Couchbase SDK for Ruby (aka. SDKv3) @@ -23,7 +23,7 @@ The library has been tested with MRI 3.0, 3.1 and 3.2. Supported platforms are L Add this line to your application's Gemfile: ```ruby -gem "couchbase", "3.4.3" +gem "couchbase", "3.4.4" ``` And then execute: diff --git a/examples/range_scan.rb b/examples/range_scan.rb new file mode 100644 index 00000000..c9f0dc6e --- /dev/null +++ b/examples/range_scan.rb @@ -0,0 +1,56 @@ +# Copyright 2023. Couchbase, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'couchbase' + +include Couchbase # rubocop:disable Style/MixinUsage for brevity + +options = Cluster::ClusterOptions.new +options.authenticate("Administrator", "password") +cluster = Cluster.connect("couchbase://localhost", options) + +bucket = cluster.bucket("travel-sample") +collection = bucket.scope("inventory").collection("airline") + +options = Options::Scan.new(ids_only: false) + +puts "Range Scan (from 'airline_1' to 'airline_11')" + +scan_type = RangeScan.new( + from: ScanTerm.new("airline_1"), + to: ScanTerm.new("airline_11", exclusive: true) +) +res = collection.scan(scan_type, options) + +res.each do |item| + puts " (#{item.id}) #{item.content['icao']}: #{item.content['name']}" +end + +puts "\nPrefix Scan (with prefix 'airline_8')" + +scan_type = PrefixScan.new("airline_8") +res = collection.scan(scan_type, options) + +res.each do |item| + puts " (#{item.id}) #{item.content['icao']}: #{item.content['name']}" +end + +puts "\nSampling Scan (with limit 5)" + +scan_type = SamplingScan.new(5) +res = collection.scan(scan_type, options) + +res.each do |item| + puts " (#{item.id}) #{item.content['icao']}: #{item.content['name']}" +end diff --git a/ext/couchbase b/ext/couchbase index 2c6a0654..43cf66a5 160000 --- a/ext/couchbase +++ b/ext/couchbase @@ -1 +1 @@ -Subproject commit 2c6a065499329f39963cc3a45c87f58c6bea5f28 +Subproject commit 43cf66a592d1f8112141a73e5a563d7187ee0ee6 diff --git a/ext/couchbase.cxx b/ext/couchbase.cxx index c6b3000e..baceab88 100644 --- a/ext/couchbase.cxx +++ b/ext/couchbase.cxx @@ -33,9 +33,12 @@ #include #include + +#include #include #include #include + #include #include #include @@ -45,6 +48,12 @@ #include #include +#include + +#include +#include +#include + #include #include #include @@ -435,7 +444,7 @@ cb_backend_close(cb_backend_data* backend) static void cb_Backend_mark(void* /* ptr */) { - /* no embeded ruby objects -- no mark */ + /* no embedded ruby objects -- no mark */ } static void @@ -495,6 +504,8 @@ cb_backend_to_cluster(VALUE self) return backend->cluster; } +static VALUE eCouchbaseError; + static VALUE eAmbiguousTimeout; static VALUE eAuthenticationFailure; static VALUE eBucketExists; @@ -534,6 +545,7 @@ static VALUE eInvalidArgument; static VALUE eJobQueueFull; static VALUE eLinkNotFound; static VALUE eLinkExists; +static VALUE eMutationTokenOutdated; static VALUE eNumberTooBig; static VALUE eParsingFailure; static VALUE ePathExists; @@ -586,7 +598,7 @@ static void init_exceptions(VALUE mCouchbase) { VALUE mError = rb_define_module_under(mCouchbase, "Error"); - VALUE eCouchbaseError = rb_define_class_under(mError, "CouchbaseError", rb_eStandardError); + eCouchbaseError = rb_define_class_under(mError, "CouchbaseError", rb_eStandardError); VALUE eTimeout = rb_define_class_under(mError, "Timeout", eCouchbaseError); @@ -629,6 +641,7 @@ init_exceptions(VALUE mCouchbase) eJobQueueFull = rb_define_class_under(mError, "JobQueueFull", eCouchbaseError); eLinkNotFound = rb_define_class_under(mError, "LinkNotFound", eCouchbaseError); eLinkExists = rb_define_class_under(mError, "LinkExists", eCouchbaseError); + eMutationTokenOutdated = rb_define_class_under(mError, "MutationTokenOutdated", eCouchbaseError); eNumberTooBig = rb_define_class_under(mError, "NumberTooBig", eCouchbaseError); eParsingFailure = rb_define_class_under(mError, "ParsingFailure", eCouchbaseError); ePathExists = rb_define_class_under(mError, "PathExists", eCouchbaseError); @@ -784,6 +797,9 @@ cb_map_error_code(std::error_code ec, const std::string& message, bool include_e case couchbase::errc::key_value::durable_write_re_commit_in_progress: return rb_exc_new_cstr(eDurableWriteReCommitInProgress, what.c_str()); + case couchbase::errc::key_value::mutation_token_outdated: + return rb_exc_new_cstr(eMutationTokenOutdated, what.c_str()); + case couchbase::errc::key_value::path_not_found: return rb_exc_new_cstr(ePathNotFound, what.c_str()); @@ -834,6 +850,10 @@ cb_map_error_code(std::error_code ec, const std::string& message, bool include_e case couchbase::errc::key_value::cannot_revive_living_document: return rb_exc_new_cstr(eCannotReviveLivingDocument, what.c_str()); + + case couchbase::errc::key_value::range_scan_completed: + // Should not be exposed to the Ruby SDK, map it to a BackendError + return rb_exc_new_cstr(eBackendError, what.c_str()); } } else if (ec.category() == couchbase::core::impl::query_category()) { switch (static_cast(ec.value())) { @@ -3409,7 +3429,7 @@ cb_Backend_document_decrement(VALUE self, VALUE bucket, VALUE scope, VALUE colle static VALUE cb_Backend_document_lookup_in(VALUE self, VALUE bucket, VALUE scope, VALUE collection, VALUE id, VALUE specs, VALUE options) { - const auto& core = cb_backend_to_cluster(self); + const auto& cluster = cb_backend_to_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -3425,10 +3445,6 @@ cb_Backend_document_lookup_in(VALUE self, VALUE bucket, VALUE scope, VALUE colle } try { - couchbase::lookup_in_options opts; - couchbase::ruby::set_timeout(opts, options); - couchbase::ruby::set_access_deleted(opts, options); - couchbase::core::document_id doc_id{ cb_string_new(bucket), cb_string_new(scope), @@ -3436,12 +3452,15 @@ cb_Backend_document_lookup_in(VALUE self, VALUE bucket, VALUE scope, VALUE colle cb_string_new(id), }; + couchbase::core::operations::lookup_in_request req{ doc_id }; + cb_extract_timeout(req, options); + cb_extract_option_bool(req.access_deleted, options, "access_deleted"); + static VALUE xattr_property = rb_id2sym(rb_intern("xattr")); static VALUE path_property = rb_id2sym(rb_intern("path")); static VALUE opcode_property = rb_id2sym(rb_intern("opcode")); auto entries_size = static_cast(RARRAY_LEN(specs)); - couchbase::lookup_in_specs cxx_specs; for (std::size_t i = 0; i < entries_size; ++i) { VALUE entry = rb_ary_entry(specs, static_cast(i)); cb_check_type(entry, T_HASH); @@ -3450,29 +3469,138 @@ cb_Backend_document_lookup_in(VALUE self, VALUE bucket, VALUE scope, VALUE colle bool xattr = RTEST(rb_hash_aref(entry, xattr_property)); VALUE path = rb_hash_aref(entry, path_property); cb_check_type(path, T_STRING); + auto opcode = couchbase::core::impl::subdoc::opcode{}; if (ID operation_id = rb_sym2id(operation); operation_id == rb_intern("get_doc")) { - cxx_specs.push_back(couchbase::lookup_in_specs::get("").xattr(xattr)); + opcode = couchbase::core::impl::subdoc::opcode::get_doc; } else if (operation_id == rb_intern("get")) { - cxx_specs.push_back(couchbase::lookup_in_specs::get(cb_string_new(path)).xattr(xattr)); + opcode = couchbase::core::impl::subdoc::opcode::get; } else if (operation_id == rb_intern("exists")) { - cxx_specs.push_back(couchbase::lookup_in_specs::exists(cb_string_new(path)).xattr(xattr)); + opcode = couchbase::core::impl::subdoc::opcode::exists; } else if (operation_id == rb_intern("count")) { - cxx_specs.push_back(couchbase::lookup_in_specs::count(cb_string_new(path)).xattr(xattr)); + opcode = couchbase::core::impl::subdoc::opcode::get_count; } else { throw ruby_exception(eInvalidArgument, rb_sprintf("unsupported operation for subdocument lookup: %+" PRIsVALUE, operation)); } cb_check_type(path, T_STRING); + + req.specs.emplace_back(couchbase::core::impl::subdoc::command{ + opcode, cb_string_new(path), {}, couchbase::core::impl::subdoc::build_lookup_in_path_flags(xattr) }); } - auto f = couchbase::cluster(core) - .bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .lookup_in(cb_string_new(id), cxx_specs, opts); + auto barrier = std::make_shared>(); + auto f = barrier->get_future(); + cluster->execute(req, [barrier](couchbase::core::operations::lookup_in_response&& resp) { barrier->set_value(std::move(resp)); }); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error_code(resp.ctx, "unable to perform lookup_in operation"); + } - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error_code(ctx, "unable to lookup_in"); + static VALUE deleted_property = rb_id2sym(rb_intern("deleted")); + static VALUE fields_property = rb_id2sym(rb_intern("fields")); + static VALUE index_property = rb_id2sym(rb_intern("index")); + static VALUE exists_property = rb_id2sym(rb_intern("exists")); + static VALUE cas_property = rb_id2sym(rb_intern("cas")); + static VALUE value_property = rb_id2sym(rb_intern("value")); + static VALUE error_property = rb_id2sym(rb_intern("error")); + + VALUE res = rb_hash_new(); + rb_hash_aset(res, cas_property, cb_cas_to_num(resp.cas)); + VALUE fields = rb_ary_new_capa(static_cast(entries_size)); + rb_hash_aset(res, fields_property, fields); + rb_hash_aset(res, deleted_property, resp.deleted ? Qtrue : Qfalse); + for (std::size_t i = 0; i < entries_size; ++i) { + auto resp_entry = resp.fields.at(i); + VALUE entry = rb_hash_new(); + rb_hash_aset(entry, index_property, ULL2NUM(resp_entry.original_index)); + rb_hash_aset(entry, exists_property, resp_entry.exists ? Qtrue : Qfalse); + rb_hash_aset(entry, path_property, cb_str_new(resp_entry.path)); + if (!resp_entry.value.empty()) { + rb_hash_aset(entry, value_property, cb_str_new(resp_entry.value)); + } + if (resp_entry.ec && resp_entry.ec != couchbase::errc::key_value::path_not_found) { + rb_hash_aset(entry, + error_property, + cb_map_error_code(resp_entry.ec, + fmt::format("error getting result for spec at index {}, path \"{}\"", i, resp_entry.path))); + } + rb_ary_store(fields, static_cast(i), entry); + } + return res; + } catch (const std::system_error& se) { + rb_exc_raise(cb_map_error_code(se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); + } catch (const ruby_exception& e) { + rb_exc_raise(e.exception_object()); + } + return Qnil; +} + +static VALUE +cb_Backend_document_lookup_in_any_replica(VALUE self, VALUE bucket, VALUE scope, VALUE collection, VALUE id, VALUE specs, VALUE options) +{ + const auto& cluster = cb_backend_to_cluster(self); + + Check_Type(bucket, T_STRING); + Check_Type(scope, T_STRING); + Check_Type(collection, T_STRING); + Check_Type(id, T_STRING); + Check_Type(specs, T_ARRAY); + if (RARRAY_LEN(specs) <= 0) { + rb_raise(rb_eArgError, "Array with specs cannot be empty"); + return Qnil; + } + if (!NIL_P(options)) { + Check_Type(options, T_HASH); + } + + try { + couchbase::core::document_id doc_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }; + + couchbase::core::operations::lookup_in_any_replica_request req{ doc_id }; + cb_extract_timeout(req, options); + + static VALUE xattr_property = rb_id2sym(rb_intern("xattr")); + static VALUE path_property = rb_id2sym(rb_intern("path")); + static VALUE opcode_property = rb_id2sym(rb_intern("opcode")); + + auto entries_size = static_cast(RARRAY_LEN(specs)); + for (std::size_t i = 0; i < entries_size; ++i) { + VALUE entry = rb_ary_entry(specs, static_cast(i)); + cb_check_type(entry, T_HASH); + VALUE operation = rb_hash_aref(entry, opcode_property); + cb_check_type(operation, T_SYMBOL); + bool xattr = RTEST(rb_hash_aref(entry, xattr_property)); + VALUE path = rb_hash_aref(entry, path_property); + cb_check_type(path, T_STRING); + auto opcode = couchbase::core::impl::subdoc::opcode{}; + if (ID operation_id = rb_sym2id(operation); operation_id == rb_intern("get_doc")) { + opcode = couchbase::core::impl::subdoc::opcode::get_doc; + } else if (operation_id == rb_intern("get")) { + opcode = couchbase::core::impl::subdoc::opcode::get; + } else if (operation_id == rb_intern("exists")) { + opcode = couchbase::core::impl::subdoc::opcode::exists; + } else if (operation_id == rb_intern("count")) { + opcode = couchbase::core::impl::subdoc::opcode::get_count; + } else { + throw ruby_exception(eInvalidArgument, rb_sprintf("unsupported operation for subdocument lookup: %+" PRIsVALUE, operation)); + } + cb_check_type(path, T_STRING); + + req.specs.emplace_back(couchbase::core::impl::subdoc::command{ + opcode, cb_string_new(path), {}, couchbase::core::impl::subdoc::build_lookup_in_path_flags(xattr) }); + } + + auto barrier = std::make_shared>(); + auto f = barrier->get_future(); + cluster->execute( + req, [barrier](couchbase::core::operations::lookup_in_any_replica_response&& resp) { barrier->set_value(std::move(resp)); }); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error_code(resp.ctx, "unable to perform lookup_in_any_replica operation"); } static VALUE deleted_property = rb_id2sym(rb_intern("deleted")); @@ -3481,23 +3609,153 @@ cb_Backend_document_lookup_in(VALUE self, VALUE bucket, VALUE scope, VALUE colle static VALUE exists_property = rb_id2sym(rb_intern("exists")); static VALUE cas_property = rb_id2sym(rb_intern("cas")); static VALUE value_property = rb_id2sym(rb_intern("value")); + static VALUE error_property = rb_id2sym(rb_intern("error")); + static VALUE is_replica_property = rb_id2sym(rb_intern("is_replica")); VALUE res = rb_hash_new(); - rb_hash_aset(res, cas_property, cb_cas_to_num(resp.cas())); + rb_hash_aset(res, cas_property, cb_cas_to_num(resp.cas)); VALUE fields = rb_ary_new_capa(static_cast(entries_size)); rb_hash_aset(res, fields_property, fields); - rb_hash_aset(res, deleted_property, resp.is_deleted() ? Qtrue : Qfalse); + rb_hash_aset(res, deleted_property, resp.deleted ? Qtrue : Qfalse); + rb_hash_aset(res, is_replica_property, resp.is_replica ? Qtrue : Qfalse); + for (std::size_t i = 0; i < entries_size; ++i) { + auto resp_entry = resp.fields.at(i); VALUE entry = rb_hash_new(); - rb_hash_aset(entry, index_property, ULL2NUM(i)); - rb_hash_aset(entry, exists_property, resp.exists(i) ? Qtrue : Qfalse); - rb_hash_aset(entry, path_property, rb_hash_aref(rb_ary_entry(specs, static_cast(i)), path_property)); - if (resp.has_value(i)) { - auto value = resp.content_as(i); - rb_hash_aset(entry, value_property, cb_str_new(couchbase::core::utils::json::generate(value))); + rb_hash_aset(entry, index_property, ULL2NUM(resp_entry.original_index)); + rb_hash_aset(entry, exists_property, resp_entry.exists ? Qtrue : Qfalse); + rb_hash_aset(entry, path_property, cb_str_new(resp_entry.path)); + if (!resp_entry.value.empty()) { + rb_hash_aset(entry, value_property, cb_str_new(resp_entry.value)); + } + if (resp_entry.ec && resp_entry.ec != couchbase::errc::key_value::path_not_found) { + rb_hash_aset(entry, + error_property, + cb_map_error_code(resp_entry.ec, + fmt::format("error getting result for spec at index {}, path \"{}\"", i, resp_entry.path))); } rb_ary_store(fields, static_cast(i), entry); } + + return res; + } catch (const std::system_error& se) { + rb_exc_raise(cb_map_error_code(se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); + } catch (const ruby_exception& e) { + rb_exc_raise(e.exception_object()); + } + return Qnil; +} + +static VALUE +cb_Backend_document_lookup_in_all_replicas(VALUE self, VALUE bucket, VALUE scope, VALUE collection, VALUE id, VALUE specs, VALUE options) +{ + const auto& cluster = cb_backend_to_cluster(self); + + Check_Type(bucket, T_STRING); + Check_Type(scope, T_STRING); + Check_Type(collection, T_STRING); + Check_Type(id, T_STRING); + Check_Type(specs, T_ARRAY); + if (RARRAY_LEN(specs) <= 0) { + rb_raise(rb_eArgError, "Array with specs cannot be empty"); + return Qnil; + } + if (!NIL_P(options)) { + Check_Type(options, T_HASH); + } + + try { + couchbase::core::document_id doc_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }; + + couchbase::core::operations::lookup_in_all_replicas_request req{ doc_id }; + cb_extract_timeout(req, options); + + static VALUE xattr_property = rb_id2sym(rb_intern("xattr")); + static VALUE path_property = rb_id2sym(rb_intern("path")); + static VALUE opcode_property = rb_id2sym(rb_intern("opcode")); + + auto entries_size = static_cast(RARRAY_LEN(specs)); + for (std::size_t i = 0; i < entries_size; ++i) { + VALUE entry = rb_ary_entry(specs, static_cast(i)); + cb_check_type(entry, T_HASH); + VALUE operation = rb_hash_aref(entry, opcode_property); + cb_check_type(operation, T_SYMBOL); + bool xattr = RTEST(rb_hash_aref(entry, xattr_property)); + VALUE path = rb_hash_aref(entry, path_property); + cb_check_type(path, T_STRING); + auto opcode = couchbase::core::impl::subdoc::opcode{}; + if (ID operation_id = rb_sym2id(operation); operation_id == rb_intern("get_doc")) { + opcode = couchbase::core::impl::subdoc::opcode::get_doc; + } else if (operation_id == rb_intern("get")) { + opcode = couchbase::core::impl::subdoc::opcode::get; + } else if (operation_id == rb_intern("exists")) { + opcode = couchbase::core::impl::subdoc::opcode::exists; + } else if (operation_id == rb_intern("count")) { + opcode = couchbase::core::impl::subdoc::opcode::get_count; + } else { + throw ruby_exception(eInvalidArgument, rb_sprintf("unsupported operation for subdocument lookup: %+" PRIsVALUE, operation)); + } + cb_check_type(path, T_STRING); + + req.specs.emplace_back(couchbase::core::impl::subdoc::command{ + opcode, cb_string_new(path), {}, couchbase::core::impl::subdoc::build_lookup_in_path_flags(xattr) }); + } + + auto barrier = std::make_shared>(); + auto f = barrier->get_future(); + cluster->execute( + req, [barrier](couchbase::core::operations::lookup_in_all_replicas_response&& resp) { barrier->set_value(std::move(resp)); }); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error_code(resp.ctx, "unable to perform lookup_in_all_replicas operation"); + } + + static VALUE deleted_property = rb_id2sym(rb_intern("deleted")); + static VALUE fields_property = rb_id2sym(rb_intern("fields")); + static VALUE index_property = rb_id2sym(rb_intern("index")); + static VALUE exists_property = rb_id2sym(rb_intern("exists")); + static VALUE cas_property = rb_id2sym(rb_intern("cas")); + static VALUE value_property = rb_id2sym(rb_intern("value")); + static VALUE error_property = rb_id2sym(rb_intern("error")); + static VALUE is_replica_property = rb_id2sym(rb_intern("is_replica")); + + auto lookup_in_entries_size = resp.entries.size(); + VALUE res = rb_ary_new_capa(static_cast(lookup_in_entries_size)); + for (std::size_t j = 0; j < lookup_in_entries_size; ++j) { + auto lookup_in_entry = resp.entries.at(j); + VALUE lookup_in_entry_res = rb_hash_new(); + rb_hash_aset(lookup_in_entry_res, cas_property, cb_cas_to_num(lookup_in_entry.cas)); + VALUE fields = rb_ary_new_capa(static_cast(entries_size)); + rb_hash_aset(lookup_in_entry_res, fields_property, fields); + rb_hash_aset(lookup_in_entry_res, deleted_property, lookup_in_entry.deleted ? Qtrue : Qfalse); + rb_hash_aset(lookup_in_entry_res, is_replica_property, lookup_in_entry.is_replica ? Qtrue : Qfalse); + + for (std::size_t i = 0; i < entries_size; ++i) { + auto field_entry = lookup_in_entry.fields.at(i); + VALUE entry = rb_hash_new(); + rb_hash_aset(entry, index_property, ULL2NUM(field_entry.original_index)); + rb_hash_aset(entry, exists_property, field_entry.exists ? Qtrue : Qfalse); + rb_hash_aset(entry, path_property, cb_str_new(field_entry.path)); + if (!field_entry.value.empty()) { + rb_hash_aset(entry, value_property, cb_str_new(field_entry.value)); + } + if (field_entry.ec && field_entry.ec != couchbase::errc::key_value::path_not_found) { + rb_hash_aset( + entry, + error_property, + cb_map_error_code(field_entry.ec, + fmt::format("error getting result for spec at index {}, path \"{}\"", i, field_entry.path))); + } + rb_ary_store(fields, static_cast(i), entry); + } + rb_ary_store(res, static_cast(j), lookup_in_entry_res); + } + return res; } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code(se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -3661,6 +3919,284 @@ cb_Backend_document_mutate_in(VALUE self, VALUE bucket, VALUE scope, VALUE colle return Qnil; } +struct cb_core_scan_result_data { + std::unique_ptr scan_result{}; +}; + +static void +cb_CoreScanResult_mark(void* ptr) +{ + /* No embedded Ruby objects */ +} + +static void +cb_CoreScanResult_free(void* ptr) +{ + auto* data = static_cast(ptr); + if (data->scan_result != nullptr && !data->scan_result->is_cancelled()) { + data->scan_result->cancel(); + } + data->scan_result.reset(); + ruby_xfree(data); +} + +static const rb_data_type_t cb_core_scan_result_type { + .wrap_struct_name = "Couchbase/Backend/CoreScanResult", + .function = { + .dmark = cb_CoreScanResult_mark, + .dfree = cb_CoreScanResult_free, + }, + .data = nullptr, +#ifdef RUBY_TYPED_FREE_IMMEDIATELY + .flags = RUBY_TYPED_FREE_IMMEDIATELY, +#endif +}; + +static VALUE +cb_CoreScanResult_allocate(VALUE klass) +{ + cb_core_scan_result_data* data = nullptr; + VALUE obj = TypedData_Make_Struct(klass, cb_core_scan_result_data, &cb_core_scan_result_type, data); + return obj; +} + +static VALUE +cb_CoreScanResult_is_cancelled(VALUE self) +{ + cb_core_scan_result_data* data = nullptr; + TypedData_Get_Struct(self, cb_core_scan_result_data, &cb_core_scan_result_type, data); + auto resp = data->scan_result->is_cancelled(); + if (resp) { + return Qtrue; + } else { + return Qfalse; + } +} + +static VALUE +cb_CoreScanResult_cancel(VALUE self) +{ + cb_core_scan_result_data* data = nullptr; + TypedData_Get_Struct(self, cb_core_scan_result_data, &cb_core_scan_result_type, data); + data->scan_result->cancel(); + return Qnil; +} + +static VALUE +cb_CoreScanResult_next_item(VALUE self) +{ + try { + cb_core_scan_result_data* data = nullptr; + TypedData_Get_Struct(self, cb_core_scan_result_data, &cb_core_scan_result_type, data); + auto barrier = std::make_shared>>(); + auto f = barrier->get_future(); + data->scan_result->next([barrier](couchbase::core::range_scan_item item, std::error_code ec) { + if (ec) { + return barrier->set_value(tl::unexpected(ec)); + } else { + return barrier->set_value(item); + } + }); + auto resp = cb_wait_for_future(f); + if (!resp.has_value()) { + // If the error code is range_scan_completed return nil without raising an exception (nil signifies that there + // are no more items) + if (resp.error() != couchbase::errc::key_value::range_scan_completed) { + cb_throw_error_code(resp.error(), "unable to fetch next scan item"); + } + // Release ownership of scan_result unique pointer + return Qnil; + } + auto item = resp.value(); + VALUE res = rb_hash_new(); + rb_hash_aset(res, rb_id2sym(rb_intern("id")), cb_str_new(item.key)); + if (item.body.has_value()) { + auto body = item.body.value(); + rb_hash_aset(res, rb_id2sym(rb_intern("id")), cb_str_new(item.key)); + rb_hash_aset(res, rb_id2sym(rb_intern("encoded")), cb_str_new(body.value)); + rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(body.cas)); + rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(body.flags)); + rb_hash_aset(res, rb_id2sym(rb_intern("expiry")), UINT2NUM(body.expiry)); + rb_hash_aset(res, rb_id2sym(rb_intern("id_only")), Qfalse); + } else { + rb_hash_aset(res, rb_id2sym(rb_intern("id_only")), Qtrue); + } + return res; + } catch (const std::system_error& se) { + rb_exc_raise(cb_map_error_code(se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); + } catch (const ruby_exception& e) { + rb_exc_raise(e.exception_object()); + } + return Qnil; +} + +static VALUE +cb_Backend_document_scan_create(VALUE self, VALUE bucket, VALUE scope, VALUE collection, VALUE scan_type, VALUE options) +{ + const auto& cluster = cb_backend_to_cluster(self); + + Check_Type(bucket, T_STRING); + Check_Type(scope, T_STRING); + Check_Type(collection, T_STRING); + Check_Type(scan_type, T_HASH); + if (!NIL_P(options)) { + Check_Type(options, T_HASH); + } + + try { + couchbase::core::range_scan_orchestrator_options orchestrator_options{}; + cb_extract_timeout(orchestrator_options, options); + cb_extract_option_bool(orchestrator_options.ids_only, options, "ids_only"); + cb_extract_option_number(orchestrator_options.batch_item_limit, options, "batch_item_limit"); + cb_extract_option_number(orchestrator_options.batch_byte_limit, options, "batch_byte_limit"); + cb_extract_option_number(orchestrator_options.concurrency, options, "concurrency"); + + // Extracting the mutation state + if (VALUE mutation_state = rb_hash_aref(options, rb_id2sym(rb_intern("mutation_state"))); !NIL_P(mutation_state)) { + cb_check_type(mutation_state, T_ARRAY); + auto state_size = static_cast(RARRAY_LEN(mutation_state)); + + if (state_size > 0) { + auto core_mut_state = couchbase::core::mutation_state{}; + core_mut_state.tokens.reserve(state_size); + for (std::size_t i = 0; i < state_size; ++i) { + VALUE token = rb_ary_entry(mutation_state, static_cast(i)); + cb_check_type(token, T_HASH); + VALUE bucket_name = rb_hash_aref(token, rb_id2sym(rb_intern("bucket_name"))); + cb_check_type(bucket_name, T_STRING); + VALUE partition_id = rb_hash_aref(token, rb_id2sym(rb_intern("partition_id"))); + cb_check_type(partition_id, T_FIXNUM); + VALUE partition_uuid = rb_hash_aref(token, rb_id2sym(rb_intern("partition_uuid"))); + switch (TYPE(partition_uuid)) { + case T_FIXNUM: + case T_BIGNUM: + break; + default: + rb_raise(rb_eArgError, "partition_uuid must be an Integer"); + } + VALUE sequence_number = rb_hash_aref(token, rb_id2sym(rb_intern("sequence_number"))); + switch (TYPE(sequence_number)) { + case T_FIXNUM: + case T_BIGNUM: + break; + default: + rb_raise(rb_eArgError, "sequence_number must be an Integer"); + } + core_mut_state.tokens.emplace_back(NUM2ULL(partition_uuid), + NUM2ULL(sequence_number), + gsl::narrow_cast(NUM2UINT(partition_id)), + cb_string_new(bucket_name)); + } + + orchestrator_options.consistent_with = core_mut_state; + } + } + + auto bucket_name = cb_string_new(bucket); + auto scope_name = cb_string_new(scope); + auto collection_name = cb_string_new(collection); + + // Getting the operation agent + auto agent_group = couchbase::core::agent_group(cluster->io_context(), couchbase::core::agent_group_config{ { cluster } }); + agent_group.open_bucket(bucket_name); + auto agent = agent_group.get_agent(bucket_name); + if (!agent.has_value()) { + rb_raise(eCouchbaseError, "Cannot perform scan operation. Unable to get operation agent"); + return Qnil; + } + + // Getting the vbucket map + auto barrier = std::make_shared>>(); + auto f = barrier->get_future(); + cluster->with_bucket_configuration(bucket_name, + [barrier](std::error_code ec, const couchbase::core::topology::configuration& config) mutable { + if (ec) { + return barrier->set_value(tl::unexpected(ec)); + } + barrier->set_value(config); + }); + auto config = cb_wait_for_future(f); + if (!config.has_value()) { + rb_raise(eCouchbaseError, "Cannot perform scan operation. Unable to get bucket configuration"); + return Qnil; + } + if (!config->supports_range_scan()) { + rb_raise(eFeatureNotAvailable, "Server does not support key-value scan operations"); + return Qnil; + } + auto vbucket_map = config->vbmap; + if (!vbucket_map || vbucket_map->empty()) { + rb_raise(eCouchbaseError, "Cannot perform scan operation. Unable to get vbucket map"); + return Qnil; + } + + // Constructing the scan type + std::variant + core_scan_type{}; + ID scan_type_id = rb_sym2id(rb_hash_aref(scan_type, rb_id2sym(rb_intern("scan_type")))); + if (scan_type_id == rb_intern("range")) { + auto range_scan = couchbase::core::range_scan{}; + + VALUE from_hash = rb_hash_aref(scan_type, rb_id2sym(rb_intern("from"))); + VALUE to_hash = rb_hash_aref(scan_type, rb_id2sym(rb_intern("to"))); + + if (!NIL_P(from_hash)) { + Check_Type(from_hash, T_HASH); + range_scan.from = couchbase::core::scan_term{}; + cb_extract_option_string(range_scan.from->term, from_hash, "term"); + cb_extract_option_bool(range_scan.from->exclusive, from_hash, "exclusive"); + } + if (!NIL_P(to_hash)) { + Check_Type(to_hash, T_HASH); + range_scan.to = couchbase::core::scan_term{}; + cb_extract_option_string(range_scan.to->term, to_hash, "term"); + cb_extract_option_bool(range_scan.to->exclusive, to_hash, "exclusive"); + } + core_scan_type = range_scan; + } else if (scan_type_id == rb_intern("prefix")) { + auto prefix_scan = couchbase::core::prefix_scan{}; + cb_extract_option_string(prefix_scan.prefix, scan_type, "prefix"); + core_scan_type = prefix_scan; + } else if (scan_type_id == rb_intern("sampling")) { + auto sampling_scan = couchbase::core::sampling_scan{}; + cb_extract_option_number(sampling_scan.limit, scan_type, "limit"); + cb_extract_option_number(sampling_scan.seed, scan_type, "seed"); + core_scan_type = sampling_scan; + } else { + rb_raise(eInvalidArgument, "Invalid scan operation type"); + } + + auto orchestrator = couchbase::core::range_scan_orchestrator( + cluster->io_context(), agent.value(), vbucket_map.value(), scope_name, collection_name, core_scan_type, orchestrator_options); + + // Start the scan + auto resp = orchestrator.scan(); + if (!resp.has_value()) { + cb_throw_error_code(resp.error(), "unable to start scan"); + } + + // Wrap core scan_result inside Ruby ScanResult + // Creating a Ruby CoreScanResult object *after* checking that no error occurred during orchestrator.scan() + VALUE cCoreScanResult = rb_define_class_under(rb_define_module("Couchbase"), "CoreScanResult", rb_cObject); + rb_define_alloc_func(cCoreScanResult, cb_CoreScanResult_allocate); + rb_define_method(cCoreScanResult, "next_item", VALUE_FUNC(cb_CoreScanResult_next_item), 0); + rb_define_method(cCoreScanResult, "cancelled?", VALUE_FUNC(cb_CoreScanResult_is_cancelled), 0); + rb_define_method(cCoreScanResult, "cancel", VALUE_FUNC(cb_CoreScanResult_cancel), 0); + VALUE core_scan_result_obj = rb_class_new_instance(0, NULL, cCoreScanResult); + rb_ivar_set(core_scan_result_obj, rb_intern("@backend"), self); + cb_core_scan_result_data* data = nullptr; + TypedData_Get_Struct(core_scan_result_obj, cb_core_scan_result_data, &cb_core_scan_result_type, data); + data->scan_result = std::make_unique(resp.value()); + return core_scan_result_obj; + + } catch (const std::system_error& se) { + rb_exc_raise(cb_map_error_code(se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); + } catch (const ruby_exception& e) { + rb_exc_raise(e.exception_object()); + } + return Qnil; +} + static int cb_for_each_named_param(VALUE key, VALUE value, VALUE arg) { @@ -3696,6 +4232,7 @@ cb_Backend_document_query(VALUE self, VALUE statement, VALUE options) cb_extract_option_bool(req.readonly, options, "readonly"); cb_extract_option_bool(req.flex_index, options, "flex_index"); cb_extract_option_bool(req.preserve_expiry, options, "preserve_expiry"); + cb_extract_option_bool(req.use_replica, options, "use_replica"); cb_extract_option_uint64(req.scan_cap, options, "scan_cap"); cb_extract_duration(req.scan_wait, options, "scan_wait"); cb_extract_option_uint64(req.max_parallelism, options, "max_parallelism"); @@ -8577,10 +9114,20 @@ cb_Backend_form_encode(VALUE self, VALUE data) return cb_str_new(encoded); } +static VALUE +cb_Backend_enable_protocol_logger_to_save_network_traffic_to_file(VALUE /* self */, VALUE path) +{ + Check_Type(path, T_STRING); + couchbase::core::logger::configuration configuration{}; + configuration.filename = cb_string_new(path); + couchbase::core::logger::create_protocol_logger(configuration); + return Qnil; +} + static void init_backend(VALUE mCouchbase) { - VALUE cBackend = rb_define_class_under(mCouchbase, "Backend", rb_cBasicObject); + VALUE cBackend = rb_define_class_under(mCouchbase, "Backend", rb_cObject); rb_define_alloc_func(cBackend, cb_Backend_allocate); rb_define_method(cBackend, "open", VALUE_FUNC(cb_Backend_open), 3); rb_define_method(cBackend, "close", VALUE_FUNC(cb_Backend_close), 0); @@ -8604,7 +9151,10 @@ init_backend(VALUE mCouchbase) rb_define_method(cBackend, "document_remove", VALUE_FUNC(cb_Backend_document_remove), 5); rb_define_method(cBackend, "document_remove_multi", VALUE_FUNC(cb_Backend_document_remove_multi), 5); rb_define_method(cBackend, "document_lookup_in", VALUE_FUNC(cb_Backend_document_lookup_in), 6); + rb_define_method(cBackend, "document_lookup_in_any_replica", VALUE_FUNC(cb_Backend_document_lookup_in_any_replica), 6); + rb_define_method(cBackend, "document_lookup_in_all_replicas", VALUE_FUNC(cb_Backend_document_lookup_in_all_replicas), 6); rb_define_method(cBackend, "document_mutate_in", VALUE_FUNC(cb_Backend_document_mutate_in), 6); + rb_define_method(cBackend, "document_scan_create", VALUE_FUNC(cb_Backend_document_scan_create), 5); rb_define_method(cBackend, "document_query", VALUE_FUNC(cb_Backend_document_query), 2); rb_define_method(cBackend, "document_touch", VALUE_FUNC(cb_Backend_document_touch), 6); rb_define_method(cBackend, "document_exists", VALUE_FUNC(cb_Backend_document_exists), 5); @@ -8706,6 +9256,10 @@ init_backend(VALUE mCouchbase) rb_define_singleton_method(cBackend, "query_escape", VALUE_FUNC(cb_Backend_query_escape), 1); rb_define_singleton_method(cBackend, "path_escape", VALUE_FUNC(cb_Backend_path_escape), 1); rb_define_singleton_method(cBackend, "form_encode", VALUE_FUNC(cb_Backend_form_encode), 1); + rb_define_singleton_method(cBackend, + "enable_protocol_logger_to_save_network_traffic_to_file", + VALUE_FUNC(cb_Backend_enable_protocol_logger_to_save_network_traffic_to_file), + 1); } void diff --git a/lib/couchbase/cluster.rb b/lib/couchbase/cluster.rb index b2d208ff..02819a59 100644 --- a/lib/couchbase/cluster.rb +++ b/lib/couchbase/cluster.rb @@ -124,7 +124,7 @@ def query(statement, options = Options::Query::DEFAULT) metrics.warning_count = resp[:meta][:metrics][:warning_count] end end - res[:warnings] = resp[:warnings].map { |warn| QueryWarning.new(warn[:code], warn[:message]) } if resp[:warnings] + meta.warnings = resp[:warnings].map { |warn| QueryWarning.new(warn[:code], warn[:message]) } if resp[:warnings] end res.instance_variable_set(:@rows, resp[:rows]) end diff --git a/lib/couchbase/collection.rb b/lib/couchbase/collection.rb index 05c04930..0371f2dc 100644 --- a/lib/couchbase/collection.rb +++ b/lib/couchbase/collection.rb @@ -15,6 +15,7 @@ require "couchbase/errors" require "couchbase/collection_options" require "couchbase/binary_collection" +require "couchbase/key_value_scan" module Couchbase # Provides access to all collection APIs @@ -481,11 +482,68 @@ def lookup_in(id, specs, options = Options::LookupIn::DEFAULT) f.index = field[:index] f.path = field[:path] f.value = field[:value] + f.error = field[:error] end end end end + # Performs lookups to document fragments. Reads from the active node and all available replicas and returns the + # first result found + # + # @param [String] id the document id which is used to uniquely identify it. + # @param [Array] specs the list of specifications which describe the types of the lookups to perform + # @param [Options::LookupInAnyReplica] options request customization + # + # @return [LookupInReplicaResult] + # + # @raise [Error::DocumentIrretrievable] + # @raise [Error::Timeout] + # @raise [Error::CouchbaseError] + # @raise [Error::FeatureNotAvailable] + def lookup_in_any_replica(id, specs, options = Options::LookupInAnyReplica::DEFAULT) + resp = @backend.document_lookup_in_any_replica( + bucket_name, @scope_name, @name, id, + specs.map do |s| + { + opcode: s.type, + xattr: s.xattr?, + path: s.path, + } + end, options.to_backend + ) + extract_lookup_in_replica_result(resp, options) + end + + # Performs lookups to document fragments. Reads from the active node and all available replicas and returns all of + # the results + # + # @param [String] id the document id which is used to uniquely identify it. + # @param [Array] specs the list of specifications which describe the types of the lookups to perform + # @param [Options::LookupInAllReplicas] options request customization + # + # @return [Array] + # + # @raise [Error::DocumentNotFound] + # @raise [Error::Timeout] + # @raise [Error::CouchbaseError] + # @raise [Error::FeatureNotAvailable] + def lookup_in_all_replicas(id, specs, options = Options::LookupInAllReplicas::DEFAULT) + resp = @backend.document_lookup_in_all_replicas( + bucket_name, @scope_name, @name, id, + specs.map do |s| + { + opcode: s.type, + xattr: s.xattr?, + path: s.path, + } + end, options.to_backend + ) + resp.map do |entry| + extract_lookup_in_replica_result(entry, options) + end + end + # Performs mutations to document fragments # # @param [String] id the document id which is used to uniquely identify it. @@ -535,6 +593,38 @@ def mutate_in(id, specs, options = Options::MutateIn::DEFAULT) end end + # Performs a key-value scan operation on the collection + # + # @api uncommitted + # + # @param [RangeScan, PrefixScan, SamplingScan] scan_type the type of the scan + # @param [Options::Scan] options request customization + # + # @example Get a sample of up to 5 documents from the collection and store their IDs in an array + # result = collection.scan(SamplingScan.new(5), Options::Scan.new(ids_only: true)) + # ids = result.map { |item| item.id } + # + # @example Get all documents whose ID starts with 'customer_1' and output their content + # result = collection.scan(PrefixScan.new("customer_1")) + # result.each { |item| puts item.content } + # + # @example Get all documents with ID between 'customer_1' and 'customer_2', excluding 'customer_2' and output their content + # result = collection.scan(RangeScan.new( + # from: ScanTerm.new("customer_1"), + # to: ScanTerm.new("customer_2", exclusive: true) + # )) + # result.each { |item| puts item.content } + # + # @return [ScanResults] + def scan(scan_type, options = Options::Scan::DEFAULT) + ScanResults.new( + core_scan_result: @backend.document_scan_create( + @bucket_name, @scope_name, @name, scan_type.to_backend, options.to_backend + ), + transcoder: options.transcoder + ) + end + private def extract_mutation_token(resp) @@ -548,6 +638,24 @@ def extract_mutation_token(resp) end end + def extract_lookup_in_replica_result(resp, options) + LookupInReplicaResult.new do |res| + res.transcoder = options.transcoder + res.cas = resp[:cas] + res.deleted = resp[:deleted] + res.is_replica = resp[:is_replica] + res.encoded = resp[:fields].map do |field| + SubDocumentField.new do |f| + f.exists = field[:exists] + f.index = field[:index] + f.path = field[:path] + f.value = field[:value] + f.error = field[:error] + end + end + end + end + # @api private # TODO: deprecate in 3.1 GetOptions = ::Couchbase::Options::Get diff --git a/lib/couchbase/collection_options.rb b/lib/couchbase/collection_options.rb index e66a7481..6cfc70a1 100644 --- a/lib/couchbase/collection_options.rb +++ b/lib/couchbase/collection_options.rb @@ -15,6 +15,9 @@ require "rubygems/deprecate" require "couchbase/json_transcoder" +require "couchbase/raw_string_transcoder" +require "couchbase/raw_json_transcoder" +require "couchbase/raw_binary_transcoder" require "couchbase/subdoc" require "couchbase/mutation_state" @@ -164,6 +167,8 @@ class LookupInResult # @return [Object] the decoded def content(path_or_index, transcoder = self.transcoder) field = get_field_at_index(path_or_index) + + raise field.error unless field.error.nil? raise Error::PathNotFound, "Path is not found: #{path_or_index}" unless field.exists transcoder.decode(field.value, :json) @@ -186,6 +191,8 @@ def exists?(path_or_index) end return false unless field + raise field.error unless field.error.nil? + field.exists end @@ -227,6 +234,18 @@ def get_field_at_index(path_or_index) end end + class LookupInReplicaResult < LookupInResult + # @return [Boolean] true if the document was read from a replica node + attr_accessor :is_replica + alias replica? is_replica + + # @yieldparam [LookupInReplicaResult] self + def initialize + super + yield self if block_given? + end + end + class MutateInResult < MutationResult # Decodes the content at the given index # @@ -291,10 +310,91 @@ class SubDocumentField # @return [String] path attr_accessor :path + # @return [CouchbaseError] error + attr_accessor :error + # @yieldparam [SubDocumentField] self def initialize yield self if block_given? end end + + class ScanResult + # @return [String] identifier of the document + attr_accessor :id + + # @return [Boolean] whether only ids are returned from this scan + attr_accessor :id_only + + # @return [Integer, nil] holds the CAS value of the fetched document + attr_accessor :cas + + # @return [Integer, nil] the expiration if fetched and present + attr_accessor :expiry + + # @return [JsonTranscoder, RawBinaryTranscoder, RawJsonTranscoder, RawStringTranscoder, #decode] The default + # transcoder which should be used + attr_accessor :transcoder + + def initialize(id:, id_only:, cas: nil, expiry: nil, encoded: nil, flags: nil, transcoder: JsonTranscoder.new) + @id = id + @id_only = id_only + @cas = cas + @expiry = expiry + @encoded = encoded + @flags = flags + @transcoder = transcoder + + yield self if block_given? + end + + # Decodes the content of the document using given (or default transcoder) + # + # @param [JsonTranscoder, RawJsonTranscoder, RawBinaryTranscoder, RawStringTranscoder] transcoder custom transcoder + # + # @return [Object, nil] + def content(transcoder = self.transcoder) + return nil if @encoded.nil? + + transcoder ? transcoder.decode(@encoded, @flags) : @encoded + end + end + + class ScanResults + include Enumerable + + def initialize(core_scan_result:, transcoder:) + @core_scan_result = core_scan_result + @transcoder = transcoder + end + + def each + return enum_for(:each) unless block_given? + + loop do + resp = @core_scan_result.next_item + + break if resp.nil? + + if resp[:id_only] + yield ScanResult.new( + id: resp[:id], + id_only: resp[:id_only], + transcoder: @transcoder + ) + else + yield ScanResult.new( + id: resp[:id], + id_only: resp[:id_only], + cas: resp[:cas], + expiry: resp[:expiry], + encoded: resp[:encoded], + flags: resp[:flags], + transcoder: @transcoder + ) + end + end + end + end end end diff --git a/lib/couchbase/errors.rb b/lib/couchbase/errors.rb index 644ea8d1..52fdc807 100644 --- a/lib/couchbase/errors.rb +++ b/lib/couchbase/errors.rb @@ -183,6 +183,11 @@ class DurableWriteInProgress < CouchbaseError class DurableWriteReCommitInProgress < CouchbaseError end + # Happens when consistency requirements are specified but the partition uuid of the requirements do not align + # with the server + class MutationTokenOutdated < CouchbaseError + end + # Subdocument exception thrown when a path does not exist in the document. The exact meaning of path existence # depends on the operation and inputs. class PathNotFound < CouchbaseError diff --git a/lib/couchbase/key_value_scan.rb b/lib/couchbase/key_value_scan.rb new file mode 100644 index 00000000..d0d9d483 --- /dev/null +++ b/lib/couchbase/key_value_scan.rb @@ -0,0 +1,125 @@ +# Copyright 2023. Couchbase, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Couchbase + # A scan term used to specify the bounds of a range scan + class ScanTerm + attr_accessor :term # @return [ScanTerm] + attr_accessor :exclusive # @return [Boolean] + + # Creates an instance of a ScanTerm + # + # @api uncommitted + # + # @param [String] term the key pattern of this term + # @param [Boolean] exclusive specifies if this term is excluded while scanning, the bounds are included by default + def initialize(term, exclusive: false) + @term = term + @exclusive = exclusive + end + + # @api private + def to_backend + { + term: @term, + exclusive: @exclusive, + } + end + end + + # A range scan performs a scan on a range of keys + class RangeScan + attr_accessor :from # @return [ScanTerm, nil] + attr_accessor :to # @return [ScanTerm, nil] + + # Creates an instance of a RangeScan scan type + # + # @api uncommitted + # + # @param [ScanTerm, String, nil] from the lower bound of the range, if set + # @param [ScanTerm, String, nil] to the upper bound of the range, if set + def initialize(from: nil, to: nil) + @from = + if from.nil? || from.instance_of?(ScanTerm) + from + else + ScanTerm(from) + end + @to = + if to.nil? || to.instance_of?(ScanTerm) + to + else + ScanTerm(to) + end + end + + # @api private + def to_backend + { + scan_type: :range, + from: @from&.to_backend, + to: @to&.to_backend, + } + end + end + + # A prefix scan performs a scan that includes all documents whose keys start with the given prefix + class PrefixScan + attr_accessor :prefix # @return [String] + + # Creates an instance of a PrefixScan scan type + # + # @api uncommitted + # + # @param [String, nil] prefix the prefix all document keys should start with + def initialize(prefix) + @prefix = prefix + end + + # @api private + def to_backend + { + scan_type: :prefix, + prefix: @prefix, + } + end + end + + # A sampling scan performs a scan that randomly selects documents up to a configured limit + class SamplingScan + attr_accessor :limit # @return [Integer] + attr_accessor :seed # @return [Integer, nil] + + # Creates an instance of a SamplingScan scan type + # + # @api uncommitted + # + # @param [Integer] limit the maximum number of documents the sampling scan can return + # @param [Integer, nil] seed the seed used for the random number generator that selects the documents. If not set, + # a seed is generated at random + def initialize(limit, seed = nil) + @limit = limit + @seed = seed + end + + # @api private + def to_backend + { + scan_type: :sampling, + limit: @limit, + seed: @seed, + } + end + end +end diff --git a/lib/couchbase/options.rb b/lib/couchbase/options.rb index a34b9bae..29e93830 100644 --- a/lib/couchbase/options.rb +++ b/lib/couchbase/options.rb @@ -1026,6 +1026,151 @@ def to_backend DEFAULT = LookupIn.new.freeze end + # Options for {Collection#lookup_in_any_replica} + class LookupInAnyReplica < Base + attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)] + + # Creates an instance of options for {Collection#lookup_in_any_replica} + # + # @param [JsonTranscoder, #decode(String)] transcoder used for encoding + # + # @param [Integer, #in_milliseconds, nil] timeout + # @param [Proc, nil] retry_strategy the custom retry strategy, if set + # @param [Hash, nil] client_context the client context data, if set + # @param [Span, nil] parent_span if set holds the parent span, that should be used for this request + # + # @yieldparam [LookupIn] self + def initialize(transcoder: JsonTranscoder.new, + timeout: nil, + retry_strategy: nil, + client_context: nil, + parent_span: nil) + super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span) + @transcoder = transcoder + yield self if block_given? + end + + # @api private + def to_backend + { + timeout: Utils::Time.extract_duration(@timeout), + } + end + + # @api private + # @return [Boolean] + attr_accessor :access_deleted + + # @api private + DEFAULT = LookupInAnyReplica.new.freeze + end + + # Options for {Collection#lookup_in_all_replicas} + class LookupInAllReplicas < Base + attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)] + + # Creates an instance of options for {Collection#lookup_in_all_replicas} + # + # @param [JsonTranscoder, #decode(String)] transcoder used for encoding + # + # @param [Integer, #in_milliseconds, nil] timeout + # @param [Proc, nil] retry_strategy the custom retry strategy, if set + # @param [Hash, nil] client_context the client context data, if set + # @param [Span, nil] parent_span if set holds the parent span, that should be used for this request + # + # @yieldparam [LookupInAllReplicas] self + def initialize(transcoder: JsonTranscoder.new, + timeout: nil, + retry_strategy: nil, + client_context: nil, + parent_span: nil) + super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span) + @transcoder = transcoder + yield self if block_given? + end + + # @api private + def to_backend + { + timeout: Utils::Time.extract_duration(@timeout), + } + end + + # @api private + DEFAULT = LookupInAllReplicas.new.freeze + end + + # Options for {Collection#scan} + class Scan < Base + attr_accessor :ids_only # @return [Boolean] + attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)] + attr_accessor :mutation_state # @return [MutationState, nil] + attr_accessor :batch_byte_limit # @return [Integer, nil] + attr_accessor :batch_item_limit # @return [Integer, nil] + attr_accessor :concurrency # @return [Integer, nil] + + # Creates an instance of options for {Collection#scan} + # + # @param [Boolean] ids_only if set to true, the content of the documents is not included in the results + # @param [JsonTranscoder, #decode(String)] transcoder used for decoding + # @param [MutationState, nil] mutation_state sets the mutation tokens this scan should be consistent with + # @param [Integer, nil] batch_byte_limit allows to limit the maximum amount of bytes that are sent from the server + # to the client on each partition batch, defaults to 15,000 + # @param [Integer, nil] batch_item_limit allows to limit the maximum amount of items that are sent from the server + # to the client on each partition batch, defaults to 50 + # @param [Integer, nil] concurrency specifies the maximum number of partitions that can be scanned concurrently, + # defaults to 1 + # + # @param [Integer, #in_milliseconds, nil] timeout + # @param [Proc, nil] retry_strategy the custom retry strategy, if set + # @param [Hash, nil] client_context the client context data, if set + # @param [Span, nil] parent_span if set holds the parent span, that should be used for this request + # + # @yieldparam [LookupIn] self + def initialize(ids_only: false, + transcoder: JsonTranscoder.new, + mutation_state: nil, + batch_byte_limit: nil, + batch_item_limit: nil, + concurrency: nil, + timeout: nil, + retry_strategy: nil, + client_context: nil, + parent_span: nil) + super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span) + @ids_only = ids_only + @transcoder = transcoder + @mutation_state = mutation_state + @batch_byte_limit = batch_byte_limit + @batch_item_limit = batch_item_limit + @concurrency = concurrency + yield self if block_given? + end + + # Sets the mutation tokens this query should be consistent with + # + # @note overrides consistency level set by {#scan_consistency=} + # + # @param [MutationState] mutation_state the mutation state containing the mutation tokens + def consistent_with(mutation_state) + @mutation_state = mutation_state + end + + # @api private + def to_backend + { + timeout: Utils::Time.extract_duration(@timeout), + ids_only: @ids_only, + mutation_state: @mutation_state.to_a, + batch_byte_limit: @batch_byte_limit, + batch_item_limit: @batch_item_limit, + concurrency: @concurrency, + } + end + + DEFAULT = Scan.new.freeze + end + # Options for {BinaryCollection#append} class Append < Base attr_accessor :cas # @return [Integer] @@ -1849,6 +1994,7 @@ class Query < Base attr_accessor :profile # @return [Symbol] attr_accessor :flex_index # @return [Boolean] attr_accessor :preserve_expiry # @return [Boolean] + attr_accessor :use_replica # @return [Boolean, nil] attr_accessor :scope_qualifier # @return [String] attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)] @@ -1894,6 +2040,8 @@ class Query < Base # @param [Boolean, nil] flex_index Tells the query engine to use a flex index (utilizing the search service) # @param [Boolean, nil] preserve_expiry Tells the query engine to preserve expiration values set on any documents # modified by this query. + # @param [Boolean, nil] use_replica Specifies that the query engine should use replica nodes for KV fetches if + # the active node is down. If not provided, the server default will be used # @param [String, nil] scope_qualifier Associate scope qualifier (also known as +query_context+) with the query. # The qualifier must be in form +{bucket_name}.{scope_name}+ or +default:{bucket_name}.{scope_name}+. # @param [JsonTranscoder] transcoder to decode rows @@ -1925,6 +2073,7 @@ def initialize(adhoc: true, profile: :off, flex_index: nil, preserve_expiry: nil, + use_replica: nil, scope_qualifier: nil, scan_consistency: :not_bounded, mutation_state: nil, @@ -1950,6 +2099,7 @@ def initialize(adhoc: true, @profile = profile @flex_index = flex_index @preserve_expiry = preserve_expiry + @use_replica = use_replica @scope_qualifier = scope_qualifier @scan_consistency = scan_consistency @mutation_state = mutation_state @@ -2043,6 +2193,7 @@ def to_backend(scope_name: nil, bucket_name: nil) readonly: @readonly, flex_index: @flex_index, preserve_expiry: @preserve_expiry, + use_replica: @use_replica, scan_wait: Utils::Time.extract_duration(@scan_wait), scan_cap: @scan_cap, pipeline_batch: @pipeline_batch, diff --git a/lib/couchbase/scope.rb b/lib/couchbase/scope.rb index c3b009b9..fb99ce3e 100644 --- a/lib/couchbase/scope.rb +++ b/lib/couchbase/scope.rb @@ -79,7 +79,7 @@ def query(statement, options = Options::Query::DEFAULT) metrics.warning_count = resp[:meta][:metrics][:warning_count] end end - res[:warnings] = resp[:warnings].map { |warn| Cluster::QueryWarning.new(warn[:code], warn[:message]) } if resp[:warnings] + meta.warnings = resp[:warnings].map { |warn| Cluster::QueryWarning.new(warn[:code], warn[:message]) } if resp[:warnings] end res.instance_variable_set(:@rows, resp[:rows]) end diff --git a/lib/couchbase/utils/time.rb b/lib/couchbase/utils/time.rb index f1c962c2..e5341bbb 100644 --- a/lib/couchbase/utils/time.rb +++ b/lib/couchbase/utils/time.rb @@ -48,8 +48,21 @@ def extract_expiry_time(time_or_duration) end end + # This method converts its argument to milliseconds + # + # 1. Integer values are interpreted as a number of milliseconds + # 2. If the argument is a Duration-like object and responds to #in_milliseconds, + # then use it and convert result to Integer + # 3. Otherwise invoke #to_i on the argument and interpret it as a number of milliseconds def extract_duration(number_or_duration) - number_or_duration.respond_to?(:in_milliseconds) ? number_or_duration.public_send(:in_milliseconds) : number_or_duration + return unless number_or_duration + return number_or_duration if number_or_duration.class == Integer # rubocop:disable Style/ClassEqualityComparison avoid overrides of #is_a?, #kind_of? + + if number_or_duration.respond_to?(:in_milliseconds) + number_or_duration.public_send(:in_milliseconds) + else + number_or_duration + end.to_i end end end diff --git a/lib/couchbase/version.rb b/lib/couchbase/version.rb index 90b7b157..2f2f64f1 100644 --- a/lib/couchbase/version.rb +++ b/lib/couchbase/version.rb @@ -19,5 +19,5 @@ module Couchbase # $ ruby -rcouchbase -e 'pp Couchbase::VERSION' # {:sdk=>"3.4.0", :ruby_abi=>"3.1.0", :revision=>"416fe68e6029ec8a4c40611cf6e6b30d3b90d20f"} VERSION = {} unless defined?(VERSION) # rubocop:disable Style/MutableConstant - VERSION.update(:sdk => "3.4.3".freeze) + VERSION.update(:sdk => "3.4.4".freeze) end diff --git a/test/query_test.rb b/test/query_test.rb index 764d10e2..dcdc5828 100644 --- a/test/query_test.rb +++ b/test/query_test.rb @@ -224,8 +224,8 @@ def retry_on(exception) def test_scoped_query skip("The server does not support scoped queries (#{env.server_version})") unless env.server_version.supports_scoped_queries? - scope_name = uniq_id(:scope).delete(".")[0, 30] - collection_name = uniq_id(:collection).delete(".")[0, 30] + scope_name = uniq_id(:scope).delete("-")[0, 30] + collection_name = uniq_id(:collection).delete("-")[0, 30] manager = @bucket.collections ns_uid = manager.create_scope(scope_name) diff --git a/test/scan_test.rb b/test/scan_test.rb new file mode 100644 index 00000000..2f8afee8 --- /dev/null +++ b/test/scan_test.rb @@ -0,0 +1,394 @@ +# Copyright 2023. Couchbase, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require_relative "test_helper" + +module Couchbase + class ScanTest < Minitest::Test + include TestUtilities + + BATCH_BYTE_LIMIT_VALUES = [0, 1, 25, 100].freeze + BATCH_ITEM_LIMIT_VALUES = [0, 1, 25, 100].freeze + CONCURRENCY_VALUES = [2, 4, 8, 32, 128].freeze + + def setup + skip("#{name}: CAVES does not support range scan") if use_caves? + skip("#{name}: Server does not support range scan (#{env.server_version})") unless env.server_version.supports_range_scan? + + connect + @bucket = @cluster.bucket(env.bucket) + @collection = @bucket.default_collection + @shared_prefix = "scan-test" + @test_ids = Set.new + + 100.times do |i| + s = i.to_s.rjust(2, "0") + id = "#{@shared_prefix}-#{s}" + @collection.upsert(id, {num: i}) + @test_ids << id + end + end + + def teardown + @test_ids.each do |id| + @collection.remove(id) + end + end + + def validate_scan(scan_result, expected_ids, ids_only: false) + items = [] + scan_result.each do |item| + items << item + + assert_equal(ids_only, item.id_only) + end + test_ids_returned = items.to_set(&:id) & @test_ids + + assert_equal(expected_ids.to_set, test_ids_returned) + assert_equal(expected_ids.size, test_ids_returned.size) + + return if ids_only + + items.each do |item| + if test_ids_returned.include? item.id + refute_equal(0, item.cas) + assert_equal(item.id, "#{@shared_prefix}-#{item.content['num'].to_s.rjust(2, '0')}") + end + end + end + + def validate_sampling_scan(scan_result, limit, ids_only: false) + items = [] + scan_result.each do |item| + items << item + + assert_equal(ids_only, item.id_only) + end + assert(items.size <= limit) + + return if ids_only + + test_ids_returned = items.to_set(&:id) & @test_ids + items.each do |item| + if test_ids_returned.include? item.id + refute_equal(0, item.cas) + assert_equal(item.id, "#{@shared_prefix}-#{item.content['num'].to_s.rjust(2, '0')}") + end + end + end + + def test_simple_prefix_scan + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1")) + validate_scan(scan_result, expected_ids) + end + + def test_simple_range_scan + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_simple_sampling_scan + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit)) + validate_sampling_scan(scan_result, limit) + end + + def test_range_scan_exclusive_from + expected_ids = (1..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10", exclusive: true), + to: ScanTerm.new("#{@shared_prefix}-29") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_exclusive_to + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..8).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10", exclusive: false), + to: ScanTerm.new("#{@shared_prefix}-29", exclusive: true) + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_both_exclusive + expected_ids = (1..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..8).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10", exclusive: true), + to: ScanTerm.new("#{@shared_prefix}-29", exclusive: true) + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_default_from + expected_ids = (0..9).map { |i| "#{@shared_prefix}-0#{i}" } + scan_result = @collection.scan( + RangeScan.new( + to: ScanTerm.new("#{@shared_prefix}-09") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_default_to + expected_ids = (0..9).map { |i| "#{@shared_prefix}-9#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-90") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_both_default + scan_result = @collection.scan(RangeScan.new) + validate_scan(scan_result, @test_ids) + end + + def test_range_scan_ids_only + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(ids_only: true) + ) + validate_scan(scan_result, expected_ids, ids_only: true) + end + + def test_range_scan_explicitly_with_content + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(ids_only: false) + ) + validate_scan(scan_result, expected_ids) + end + + def test_prefix_scan_ids_only + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1"), Options::Scan.new(ids_only: true)) + validate_scan(scan_result, expected_ids, ids_only: true) + end + + def test_sampling_scan_ids_only + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit), Options::Scan.new(ids_only: true)) + validate_sampling_scan(scan_result, limit, ids_only: true) + end + + def test_sampling_scan_with_seed + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit, 42), Options::Scan.new(ids_only: true)) + validate_sampling_scan(scan_result, limit, ids_only: true) + end + + def test_range_scan_batch_byte_limit + BATCH_BYTE_LIMIT_VALUES.each do |b| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(batch_byte_limit: b) + ) + validate_scan(scan_result, expected_ids) + end + end + + def test_prefix_scan_batch_byte_limit + BATCH_BYTE_LIMIT_VALUES.each do |b| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1"), Options::Scan.new(batch_byte_limit: b)) + validate_scan(scan_result, expected_ids) + end + end + + def test_sampling_scan_batch_byte_limit + BATCH_BYTE_LIMIT_VALUES.each do |b| + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit), Options::Scan.new(batch_byte_limit: b)) + validate_sampling_scan(scan_result, limit) + end + end + + def test_range_scan_concurrency + skip("Skipped until CXXCBC-345 is resolved") + + CONCURRENCY_VALUES.each do |c| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(concurrency: c) + ) + validate_scan(scan_result, expected_ids) + end + end + + def test_prefix_scan_concurrency + skip("Skipped until CXXCBC-345 is resolved") + + CONCURRENCY_VALUES.each do |c| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1"), Options::Scan.new(concurrency: c)) + validate_scan(scan_result, expected_ids) + end + end + + def test_sampling_scan_concurrency + skip("Skipped until CXXCBC-345 is resolved") + + CONCURRENCY_VALUES.each do |c| + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit), Options::Scan.new(concurrency: c)) + validate_sampling_scan(scan_result, limit) + end + end + + def test_range_scan_batch_item_limit + BATCH_ITEM_LIMIT_VALUES.each do |b| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(batch_item_limit: b) + ) + validate_scan(scan_result, expected_ids) + end + end + + def test_prefix_scan_batch_item_limit + BATCH_ITEM_LIMIT_VALUES.each do |b| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1"), Options::Scan.new(batch_item_limit: b)) + validate_scan(scan_result, expected_ids) + end + end + + def test_sampling_scan_batch_item_limit + limit = 11 + BATCH_ITEM_LIMIT_VALUES.each do |b| + scan_result = @collection.scan(SamplingScan.new(limit), Options::Scan.new(batch_item_limit: b)) + validate_sampling_scan(scan_result, limit) + end + end + + def test_range_scan_multiple_options + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(batch_byte_limit: 100, batch_item_limit: 20, ids_only: false) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_collection_does_not_exist + collection = @bucket.scope("_default").collection(uniq_id(:nonexistent)) + assert_raises(Error::CollectionNotFound) do + collection.scan(RangeScan.new) + end + end + + def test_range_scan_same_from_to + expected_ids = ["#{@shared_prefix}-10"] + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-10") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_same_from_to_exclusive + expected_ids = [] + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10", exclusive: true), + to: ScanTerm.new("#{@shared_prefix}-10", exclusive: true) + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_inverted_bounds + expected_ids = [] + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-20", exclusive: true), + to: ScanTerm.new("#{@shared_prefix}-10", exclusive: true) + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_sampling_scan_non_positive_limit + collection = @bucket.scope("_default").collection(uniq_id(:nonexistent)) + assert_raises(Error::InvalidArgument) do + collection.scan(SamplingScan.new(0)) + end + end + + def test_range_scan_zero_concurrency + collection = @bucket.scope("_default").collection(uniq_id(:nonexistent)) + assert_raises(Error::InvalidArgument) do + collection.scan(RangeScan.new, Options::Scan.new(concurrency: 0)) + end + end + end + + class ScanNotSupportedTest < Minitest::Test + include TestUtilities + + def setup + skip("#{name}: Server supports range scan (#{env.server_version})") if env.server_version.supports_range_scan? + + connect + @bucket = @cluster.bucket(env.bucket) + @collection = @bucket.default_collection + end + + def test_range_scan_feature_not_available + assert_raises(Error::FeatureNotAvailable) do + @collection.scan(RangeScan.new) + end + end + end +end diff --git a/test/subdoc_test.rb b/test/subdoc_test.rb index 06465744..438d5ea9 100644 --- a/test/subdoc_test.rb +++ b/test/subdoc_test.rb @@ -1310,5 +1310,261 @@ def test_create_tombstones assert_equal({"field" => "b"}, res.content(0)) assert_predicate res, :deleted?, "the document should be marked as 'deleted'" end + + def test_lookup_in_any_replica_get + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + res = @collection.lookup_in_any_replica(doc_id, [ + LookupInSpec.get("value"), + ]) + + assert_equal 42, res.content(0) + assert_respond_to res, :replica? + end + + def test_lookup_in_all_replicas_get + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + res = @collection.lookup_in_all_replicas(doc_id, [ + LookupInSpec.get("value"), + ]) + + refute_empty res + + res.each do |entry| + assert_equal 42, entry.content(0) + assert_respond_to entry, :replica? + end + end + + def test_lookup_in_any_replica_get_doc + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + res = @collection.lookup_in_any_replica(doc_id, [ + LookupInSpec.get(""), + ]) + + assert_equal document, res.content(0) + assert_respond_to res, :replica? + end + + def test_lookup_in_all_replicas_get_doc + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + res = @collection.lookup_in_all_replicas(doc_id, [ + LookupInSpec.get(""), + ]) + + refute_empty res + + res.each do |entry| + assert_equal document, entry.content(0) + assert_respond_to entry, :replica? + end + end + + def test_lookup_in_any_replica_exists + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + options = Options::LookupInAnyReplica.new + res = @collection.lookup_in_any_replica(doc_id, [ + LookupInSpec.exists("value"), + LookupInSpec.exists("foo"), + ], options) + + assert res.exists?(0) + refute res.exists?(1) + assert_respond_to res, :replica? + end + + def test_lookup_in_all_replicas_exist + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + options = Options::LookupInAllReplicas.new + res = @collection.lookup_in_all_replicas(doc_id, [ + LookupInSpec.exists("value"), + LookupInSpec.exists("foo"), + ], options) + + refute_empty res + + res.each do |entry| + assert entry.exists?(0) + refute entry.exists?(1) + assert_respond_to entry, :replica? + end + end + + def test_lookup_in_any_replica_bad_key + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + assert_raises(Error::DocumentIrretrievable) do + @collection.lookup_in_any_replica(doc_id, [ + LookupInSpec.get("value"), + ]) + end + end + + def test_lookup_in_all_replicas_bad_key + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + assert_raises(Error::DocumentNotFound) do + @collection.lookup_in_all_replicas(doc_id, [ + LookupInSpec.get("value"), + ]) + end + end + + def test_lookup_in_path_invalid + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document) + + res = @collection.lookup_in(doc_id, [ + LookupInSpec.get("value.."), + ]) + + assert_raises(Error::PathInvalid) do + res.exists?(0) + end + assert_raises(Error::PathInvalid) do + res.content(0) + end + end + + def test_lookup_in_path_mismatch + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document) + + res = @collection.lookup_in(doc_id, [ + LookupInSpec.count("value"), + ]) + + assert_raises(Error::PathMismatch) do + res.exists?(0) + end + assert_raises(Error::PathMismatch) do + res.content(0) + end + end + + def test_lookup_in_any_replica_path_invalid + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + res = @collection.lookup_in_any_replica(doc_id, [ + LookupInSpec.count("value.."), + ]) + + assert_raises(Error::PathInvalid) do + res.exists?(0) + end + assert_raises(Error::PathInvalid) do + res.content(0) + end + end + + def test_lookup_in_any_replica_path_mismatch + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + res = @collection.lookup_in_any_replica(doc_id, [ + LookupInSpec.count("value"), + ]) + + assert_raises(Error::PathMismatch) do + res.exists?(0) + end + assert_raises(Error::PathMismatch) do + res.content(0) + end + end + + def test_lookup_in_all_replicas_path_invalid + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + res = @collection.lookup_in_all_replicas(doc_id, [ + LookupInSpec.count("value.."), + ]) + + res.each do |entry| + assert_raises(Error::PathInvalid) do + entry.exists?(0) + end + assert_raises(Error::PathInvalid) do + entry.content(0) + end + end + end + + def test_lookup_in_all_replicas_path_mismatch + skip("#{name}: CAVES does not support subdoc read from replica yet") if use_caves? + skip("#{name}: Server does not support subdoc read from replica") unless env.server_version.supports_subdoc_read_from_replica? + + doc_id = uniq_id(:foo) + document = {"value" => 42} + @collection.upsert(doc_id, document, Options::Upsert.new(durability_level: :majority_and_persist_to_active)) + + res = @collection.lookup_in_all_replicas(doc_id, [ + LookupInSpec.count("value"), + ]) + + res.each do |entry| + assert_raises(Error::PathMismatch) do + entry.exists?(0) + end + assert_raises(Error::PathMismatch) do + entry.content(0) + end + end + end end end diff --git a/test/test_helper.rb b/test/test_helper.rb index a9f2e4f8..84564dad 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -55,6 +55,14 @@ def cheshire_cat? @version >= Gem::Version.create("7.0.0") end + def elixir? + @version >= Gem::Version.create("7.5.0") + end + + def trinity? + @version >= Gem::Version.create("7.6.0") + end + def supports_collections? cheshire_cat? end @@ -82,6 +90,14 @@ def supports_preserve_expiry? def is_rcbc_408_applicable? @version < Gem::Version.create("7.0.0") end + + def supports_range_scan? + elixir? + end + + def supports_subdoc_read_from_replica? + elixir? + end end require "couchbase" @@ -178,7 +194,7 @@ def time_travel(duration) def uniq_id(name) parent = caller_locations&.first prefix = "#{File.basename(parent&.path, '.rb')}_#{parent&.lineno}" - "#{prefix}_#{name}_#{Time.now.to_f.to_s.reverse}" + "#{prefix}_#{name}_#{Time.now.to_f.to_s.reverse}".sub(".", "-") end def load_raw_test_dataset(dataset) diff --git a/test/utils_time_test.rb b/test/utils_time_test.rb new file mode 100644 index 00000000..4d6fe340 --- /dev/null +++ b/test/utils_time_test.rb @@ -0,0 +1,58 @@ +# Copyright 2023. Couchbase, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "test_helper" + +require "couchbase/utils/time" + +require "active_support/core_ext/numeric/time" +require "active_support/duration" + +class MyDuration + def initialize(seconds) + @value = seconds + end + + def in_milliseconds + @value * 1000 + end +end + +class UtilsTimeTest < Minitest::Test + def test_accepts_rails_duration + duration = 42.seconds + + assert_kind_of ActiveSupport::Duration, duration + assert_equal 42_000, Couchbase::Utils::Time.extract_duration(duration) + end + + def test_accepts_duration_like_objects + assert_equal 42_000, Couchbase::Utils::Time.extract_duration(MyDuration.new(42)) + end + + def test_accepts_floats + duration = Couchbase::Utils::Time.extract_duration(0.42) + + assert_kind_of Integer, duration + assert_equal 420, duration + end + + def test_interpret_integer_as_a_milliseconds_literal + assert_equal 42, Couchbase::Utils::Time.extract_duration(42) + end + + def test_ignores_nil_argument + assert_nil Couchbase::Utils::Time.extract_duration(nil) + end +end