diff --git a/src/libpmemobj/memops.c b/src/libpmemobj/memops.c index 591730f341f3ca3a543daaafe3f9621215403b5c..8bb9fe515919f86a892977614443160b47bfcabe 100644 --- a/src/libpmemobj/memops.c +++ b/src/libpmemobj/memops.c @@ -515,7 +515,8 @@ operation_user_buffer_range_cmp(const void *lhs, const void *rhs) * it returns 0 */ static int -operation_user_buffer_try_insert(PMEMobjpool *pop, void *addr, size_t size) +operation_user_buffer_try_insert(PMEMobjpool *pop, + struct user_buffer_def *userbuf) { int ret = 0; @@ -524,7 +525,7 @@ operation_user_buffer_try_insert(PMEMobjpool *pop, void *addr, size_t size) util_mutex_lock(&pop->ulog_user_buffers.lock); - void *addr_end = (char *)addr + size; + void *addr_end = (char *)userbuf->addr + userbuf->size; struct user_buffer_def search; search.addr = addr_end; struct ravl_node *n = ravl_find(pop->ulog_user_buffers.map, @@ -533,18 +534,14 @@ operation_user_buffer_try_insert(PMEMobjpool *pop, void *addr, size_t size) struct user_buffer_def *r = ravl_data(n); void *r_end = (char *)r->addr + r->size; - if (r_end >= addr && r->addr <= addr_end) { + if (r_end > userbuf->addr && r->addr < addr_end) { /* what was found overlaps with what is being added */ ret = -1; goto out; } } - struct user_buffer_def range; - range.addr = addr; - range.size = size; - - if (ravl_emplace_copy(pop->ulog_user_buffers.map, &range) == -1) { + if (ravl_emplace_copy(pop->ulog_user_buffers.map, userbuf) == -1) { ASSERTne(errno, EEXIST); ret = -1; } @@ -555,22 +552,24 @@ out: } /* - * operation_add_user_buffer -- add user buffer to the ulog + * operation_user_buffer_verify_align -- verify if the provided buffer can be + * used as a transaction log, and if so - perform necessary alignments */ int -operation_add_user_buffer(struct operation_context *ctx, - void *addr, size_t size) +operation_user_buffer_verify_align(struct operation_context *ctx, + struct user_buffer_def *userbuf) { /* * Address of the buffer has to be aligned up, and the size * has to be aligned down, taking into account the number of bytes - * the address was incremented by. The remaining size, has to be large + * the address was incremented by. The remaining size has to be large * enough to contain the header and at least one ulog entry. */ - uint64_t buffer_offset = OBJ_PTR_TO_OFF(ctx->p_ops->base, addr); + uint64_t buffer_offset = OBJ_PTR_TO_OFF(ctx->p_ops->base, + userbuf->addr); ptrdiff_t size_diff = (intptr_t)ulog_by_offset(buffer_offset, - ctx->p_ops) - (intptr_t)addr; - ssize_t capacity_unaligned = (ssize_t)size - size_diff + ctx->p_ops) - (intptr_t)userbuf->addr; + ssize_t capacity_unaligned = (ssize_t)userbuf->size - size_diff - (ssize_t)sizeof(struct ulog); if (capacity_unaligned <= (ssize_t)CACHELINE_SIZE) { ERR("Capacity insufficient"); @@ -580,14 +579,29 @@ operation_add_user_buffer(struct operation_context *ctx, size_t capacity_aligned = ALIGN_DOWN((size_t)capacity_unaligned, CACHELINE_SIZE); - if (operation_user_buffer_try_insert(ctx->p_ops->base, - ulog_by_offset(buffer_offset, ctx->p_ops), - capacity_aligned + sizeof(struct ulog))) { + userbuf->addr = ulog_by_offset(buffer_offset, ctx->p_ops); + userbuf->size = capacity_aligned + sizeof(struct ulog); + + if (operation_user_buffer_try_insert(ctx->p_ops->base, userbuf)) { ERR("Buffer currently used"); return -1; } - ulog_construct(buffer_offset, capacity_aligned, ctx->ulog->gen_num, + return 0; +} + +/* + * operation_add_user_buffer -- add user buffer to the ulog + */ +void +operation_add_user_buffer(struct operation_context *ctx, + struct user_buffer_def *userbuf) +{ + uint64_t buffer_offset = OBJ_PTR_TO_OFF(ctx->p_ops->base, + userbuf->addr); + size_t capacity = userbuf->size - sizeof(struct ulog); + + ulog_construct(buffer_offset, capacity, ctx->ulog->gen_num, 1, ULOG_USER_OWNED, ctx->p_ops); struct ulog *last_log; @@ -604,10 +618,8 @@ operation_add_user_buffer(struct operation_context *ctx, pmemops_persist(ctx->p_ops, &last_log->next, next_size); VEC_PUSH_BACK(&ctx->next, buffer_offset); - ctx->ulog_capacity += capacity_aligned; + ctx->ulog_capacity += capacity; operation_set_any_user_buffer(ctx, 1); - - return 0; } /* @@ -798,28 +810,48 @@ operation_finish(struct operation_context *ctx, unsigned flags) ASSERTeq(ctx->in_progress, 1); ctx->in_progress = 0; - if (ctx->ulog_any_user_buffer) + /* + * Cleanup of the ulogs is only necessary when either any user logs + * has been appended, so now it needs to be removed, or the log has + * been written to. + */ + int cleanup = 0; + + if (ctx->ulog_any_user_buffer) { flags |= ULOG_ANY_USER_BUFFER; + cleanup = 1; + } if (ctx->type == LOG_TYPE_REDO && ctx->pshadow_ops.offset != 0) { operation_process(ctx); - } else if (ctx->type == LOG_TYPE_UNDO && - (ctx->total_logged != 0 || ctx->ulog_any_user_buffer)) { - /* - * It is not necessary to recalculate capacity - * and rebuild vector if none of the ulogs have been freed. - */ - if (!ulog_clobber_data(ctx->ulog, + cleanup = 1; + } + + if (ctx->type == LOG_TYPE_UNDO && ctx->total_logged != 0) + cleanup = 1; + + if (!cleanup) + return; + + if (ctx->type == LOG_TYPE_UNDO) { + int ret = ulog_clobber_data(ctx->ulog, ctx->total_logged, ctx->ulog_base_nbytes, &ctx->next, ctx->ulog_free, operation_user_buffer_remove, - ctx->p_ops, flags)) + ctx->p_ops, flags); + if (ret == 0) + return; + } else if (ctx->type == LOG_TYPE_REDO) { + int ret = ulog_free_next(ctx->ulog, ctx->p_ops, + ctx->ulog_free, operation_user_buffer_remove, + flags); + if (ret == 0) return; - - /* clobbering might have shrunk the ulog */ - ctx->ulog_capacity = ulog_capacity(ctx->ulog, - ctx->ulog_base_nbytes, ctx->p_ops); - VEC_CLEAR(&ctx->next); - ulog_rebuild_next_vec(ctx->ulog, &ctx->next, ctx->p_ops); } + + /* clobbering shrunk the ulog */ + ctx->ulog_capacity = ulog_capacity(ctx->ulog, + ctx->ulog_base_nbytes, ctx->p_ops); + VEC_CLEAR(&ctx->next); + ulog_rebuild_next_vec(ctx->ulog, &ctx->next, ctx->p_ops); } diff --git a/src/libpmemobj/memops.h b/src/libpmemobj/memops.h index 37ddde8c31a31f1fa9bbcaad3242303f36beae6a..455c858d7f376979e063f548e7e19021526a6616 100644 --- a/src/libpmemobj/memops.h +++ b/src/libpmemobj/memops.h @@ -90,8 +90,10 @@ int operation_add_entry(struct operation_context *ctx, int operation_add_typed_entry(struct operation_context *ctx, void *ptr, uint64_t value, ulog_operation_type type, enum operation_log_type log_type); -int operation_add_user_buffer(struct operation_context *ctx, - void *addr, size_t size); +int operation_user_buffer_verify_align(struct operation_context *ctx, + struct user_buffer_def *userbuf); +void operation_add_user_buffer(struct operation_context *ctx, + struct user_buffer_def *userbuf); void operation_set_auto_reserve(struct operation_context *ctx, int auto_reserve); void operation_set_any_user_buffer(struct operation_context *ctx, diff --git a/src/libpmemobj/tx.c b/src/libpmemobj/tx.c index 045e5128d84d29cd7cbd83ec8b2cad2857d7495a..7ac21ae256a39a66a60947122c350c202845346d 100644 --- a/src/libpmemobj/tx.c +++ b/src/libpmemobj/tx.c @@ -62,6 +62,8 @@ struct tx { struct ravl *ranges; VEC(, struct pobj_action) actions; + VEC(, struct user_buffer_def) redo_userbufs; + size_t redo_userbufs_capacity; pmemobj_tx_callback stage_callback; void *stage_callback_arg; @@ -187,15 +189,31 @@ obj_tx_abort_null(int errnum) FATAL("%s called in invalid stage %d", __func__, (tx)->stage);\ } while (0) +/* + * tx_action_reserve -- (internal) reserve space for the given number of actions + */ +static int +tx_action_reserve(struct tx *tx, size_t n) +{ + size_t entries_size = (VEC_SIZE(&tx->actions) + n) * + sizeof(struct ulog_entry_val); + + /* take the provided user buffers into account when reserving */ + entries_size -= MIN(tx->redo_userbufs_capacity, entries_size); + + if (operation_reserve(tx->lane->external, entries_size) != 0) + return -1; + + return 0; +} + /* * tx_action_add -- (internal) reserve space and add a new tx action */ static struct pobj_action * tx_action_add(struct tx *tx) { - size_t entries_size = (VEC_SIZE(&tx->actions) + 1) * - sizeof(struct ulog_entry_val); - if (operation_reserve(tx->lane->external, entries_size) != 0) + if (tx_action_reserve(tx, 1) != 0) return NULL; VEC_INC_BACK(&tx->actions); @@ -454,7 +472,6 @@ tx_abort(PMEMobjpool *pop, struct lane *lane) ravl_delete_cb(tx->ranges, tx_clean_range, pop); palloc_cancel(&pop->heap, VEC_ARR(&tx->actions), VEC_SIZE(&tx->actions)); - VEC_CLEAR(&tx->actions); tx->ranges = NULL; } @@ -691,9 +708,26 @@ tx_construct_user_buffer(struct tx *tx, void *addr, size_t size, if (outer_tx && !operation_get_any_user_buffer(ctx)) operation_free_logs(ctx, ULOG_ANY_USER_BUFFER); - if (operation_add_user_buffer(ctx, addr, size)) + struct user_buffer_def userbuf = {addr, size}; + if (operation_user_buffer_verify_align(ctx, &userbuf) != 0) goto err; + if (type == TX_LOG_TYPE_INTENT) { + /* + * Redo log context is not used until transaction commit and + * cannot be used until then, and so the user buffers have to + * be stored and added the operation at commit time. + * This is because atomic operations can executed independently + * in the same lane as a running transaction. + */ + if (VEC_PUSH_BACK(&tx->redo_userbufs, userbuf) != 0) + goto err; + tx->redo_userbufs_capacity += + userbuf.size - TX_INTENT_LOG_BUFFER_OVERHEAD; + } else { + operation_add_user_buffer(ctx, &userbuf); + } + return 0; err: @@ -726,6 +760,8 @@ pmemobj_tx_begin(PMEMobjpool *pop, jmp_buf env, ...) operation_start(tx->lane->undo); VEC_INIT(&tx->actions); + VEC_INIT(&tx->redo_userbufs); + tx->redo_userbufs_capacity = 0; PMDK_SLIST_INIT(&tx->tx_entries); PMDK_SLIST_INIT(&tx->tx_locks); @@ -911,8 +947,6 @@ static void tx_post_commit(struct tx *tx) { operation_finish(tx->lane->undo, 0); - - VEC_CLEAR(&tx->actions); } /* @@ -947,6 +981,11 @@ pmemobj_tx_commit(void) pmemops_drain(&pop->p_ops); operation_start(tx->lane->external); + + struct user_buffer_def *userbuf; + VEC_FOREACH_BY_PTR(userbuf, &tx->redo_userbufs) + operation_add_user_buffer(tx->lane->external, userbuf); + palloc_publish(&pop->heap, VEC_ARR(&tx->actions), VEC_SIZE(&tx->actions), tx->lane->external); @@ -1001,6 +1040,7 @@ pmemobj_tx_end(void) tx->pop = NULL; tx->stage = TX_STAGE_NONE; VEC_DELETE(&tx->actions); + VEC_DELETE(&tx->redo_userbufs); if (tx->stage_callback) { pmemobj_tx_callback cb = tx->stage_callback; @@ -1751,10 +1791,7 @@ pmemobj_tx_publish(struct pobj_action *actv, size_t actvcnt) ASSERT_TX_STAGE_WORK(tx); PMEMOBJ_API_START(); - size_t entries_size = (VEC_SIZE(&tx->actions) + actvcnt) * - sizeof(struct ulog_entry_val); - - if (operation_reserve(tx->lane->external, entries_size) != 0) { + if (tx_action_reserve(tx, actvcnt) != 0) { int ret = obj_tx_abort_err(ENOMEM); PMEMOBJ_API_END(); return ret; diff --git a/src/test/obj_memops/obj_memops.c b/src/test/obj_memops/obj_memops.c index e95dfbf9a09d60c23a11d3ce9bba464e3ea3aca2..8869948b554c0455459b669568b0af668b2840d8 100644 --- a/src/test/obj_memops/obj_memops.c +++ b/src/test/obj_memops/obj_memops.c @@ -86,6 +86,12 @@ pmalloc_redo_extend(void *base, uint64_t *redo, uint64_t gen_num) 0, OBJ_INTERNAL_OBJECT_MASK, 0); } +static void +test_free_entry(void *base, uint64_t *next) +{ + /* noop for fake ulog entries */ +} + static void test_set_entries(PMEMobjpool *pop, struct operation_context *ctx, struct test_object *object, @@ -177,7 +183,8 @@ test_redo(PMEMobjpool *pop, struct test_object *object) { struct operation_context *ctx = operation_new( (struct ulog *)&object->redo, TEST_ENTRIES, - pmalloc_redo_extend, NULL, &pop->p_ops, LOG_TYPE_REDO); + pmalloc_redo_extend, (ulog_free_fn)pfree, + &pop->p_ops, LOG_TYPE_REDO); test_set_entries(pop, ctx, object, 10, FAIL_NONE); clear_test_values(object); @@ -201,7 +208,7 @@ test_redo(PMEMobjpool *pop, struct test_object *object) /* verify that rebuilding redo_next works */ ctx = operation_new( (struct ulog *)&object->redo, TEST_ENTRIES, - NULL, NULL, &pop->p_ops, LOG_TYPE_REDO); + NULL, test_free_entry, &pop->p_ops, LOG_TYPE_REDO); test_set_entries(pop, ctx, object, 100, 0); clear_test_values(object); @@ -484,7 +491,7 @@ test_undo_log_reuse() struct operation_context *ctx = operation_new( (struct ulog *)first, ULOG_SIZE, - NULL, NULL, + NULL, test_free_entry, &ops, LOG_TYPE_UNDO); size_t nentries = 0; diff --git a/src/test/obj_ulog_size/TESTS.py b/src/test/obj_ulog_size/TESTS.py old mode 100644 new mode 100755 diff --git a/src/test/obj_ulog_size/obj_ulog_size.c b/src/test/obj_ulog_size/obj_ulog_size.c index df81ab468de710a48dc9212dd56777acd9967529..77ab8961586507e041ef40d7990940b33b22528f 100644 --- a/src/test/obj_ulog_size/obj_ulog_size.c +++ b/src/test/obj_ulog_size/obj_ulog_size.c @@ -518,6 +518,89 @@ do_tx_max_alloc_tx_publish(PMEMobjpool *pop) } } +/* + * do_tx_buffer_overlapping -- checks if user buffer overlap detection works + */ +static void +do_tx_buffer_overlapping(PMEMobjpool *pop) +{ + UT_OUT("do_tx_buffer_overlapping"); + + /* changes verify_user_buffers value */ + int verify_user_buffers = 1; + int ret = pmemobj_ctl_set(pop, "tx.debug.verify_user_buffers", + &verify_user_buffers); + UT_ASSERTeq(ret, 0); + + PMEMoid oid = OID_NULL; + pmemobj_alloc(pop, &oid, MAX_ALLOC, 0, NULL, NULL); + UT_ASSERT(!OID_IS_NULL(oid)); + + char *ptr = (char *)pmemobj_direct(oid); + ptr = (char *)ALIGN_UP((size_t)ptr, CACHELINE_SIZE); + + TX_BEGIN(pop) { + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr + 256, 256); + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr, 256); + } TX_ONABORT { + UT_ASSERT(0); + } TX_ONCOMMIT { + UT_OUT("Overlap not detected"); + } TX_END + + TX_BEGIN(pop) { + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr, 256); + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr + 256, 256); + } TX_ONABORT { + UT_ASSERT(0); + } TX_ONCOMMIT { + UT_OUT("Overlap not detected"); + } TX_END + + TX_BEGIN(pop) { + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr, 256); + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr, 256); + } TX_ONABORT { + UT_OUT("Overlap detected"); + } TX_ONCOMMIT { + UT_ASSERT(0); + } TX_END + + TX_BEGIN(pop) { + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr, 256); + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr + 128, 256); + } TX_ONABORT { + UT_OUT("Overlap detected"); + } TX_ONCOMMIT { + UT_ASSERT(0); + } TX_END + + TX_BEGIN(pop) { + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr + 128, 256); + pmemobj_tx_log_append_buffer(TX_LOG_TYPE_INTENT, + ptr, 256); + } TX_ONABORT { + UT_OUT("Overlap detected"); + } TX_ONCOMMIT { + UT_ASSERT(0); + } TX_END + + pmemobj_free(&oid); + verify_user_buffers = 0; + ret = pmemobj_ctl_set(pop, "tx.debug.verify_user_buffers", + &verify_user_buffers); + UT_ASSERTeq(ret, 0); +} + int main(int argc, char *argv[]) { @@ -547,6 +630,7 @@ main(int argc, char *argv[]) do_tx_max_alloc_tx_publish_abort(pop); do_tx_buffer_currently_used(pop); do_tx_max_alloc_tx_publish(pop); + do_tx_buffer_overlapping(pop); pmemobj_close(pop); pmemobj_close(pop2); diff --git a/src/test/obj_ulog_size/out0.log.match b/src/test/obj_ulog_size/out0.log.match index af41d5dee0a7ac5027b95df4193e63adcf2eea03..53bec6824a82e84823f0b97479ab4c2a7b627f72 100644 --- a/src/test/obj_ulog_size/out0.log.match +++ b/src/test/obj_ulog_size/out0.log.match @@ -19,4 +19,10 @@ do_tx_buffer_currently_used User cannot append the same undo log buffer twice: Invalid argument do_tx_max_alloc_tx_publish Can extend redo log with appended buffer +do_tx_buffer_overlapping +Overlap not detected +Overlap not detected +Overlap detected +Overlap detected +Overlap detected obj_ulog_size$(nW)TEST0: DONE diff --git a/src/test/obj_ulog_size/out1.log.match b/src/test/obj_ulog_size/out1.log.match index 19b4d70be71bd79426b9acd638e7df0590477e20..5f83e4f29a77605ac842e017b86c814faa7dabe7 100644 --- a/src/test/obj_ulog_size/out1.log.match +++ b/src/test/obj_ulog_size/out1.log.match @@ -19,4 +19,10 @@ do_tx_buffer_currently_used User cannot append the same undo log buffer twice: Invalid argument do_tx_max_alloc_tx_publish Can extend redo log with appended buffer +do_tx_buffer_overlapping +Overlap not detected +Overlap not detected +Overlap detected +Overlap detected +Overlap detected obj_ulog_size$(nW)TEST1: DONE diff --git a/src/test/obj_ulog_size/out2.log.match b/src/test/obj_ulog_size/out2.log.match index 7f7e5a4dd61f045e3dcb5345912b64d8446bc835..6c913a737424cdce45165fa9b454de4bde00be07 100644 --- a/src/test/obj_ulog_size/out2.log.match +++ b/src/test/obj_ulog_size/out2.log.match @@ -19,4 +19,10 @@ do_tx_buffer_currently_used User cannot append the same undo log buffer twice: Invalid argument do_tx_max_alloc_tx_publish Can extend redo log with appended buffer +do_tx_buffer_overlapping +Overlap not detected +Overlap not detected +Overlap detected +Overlap detected +Overlap detected obj_ulog_size$(nW)TEST2: DONE