From 24aad22d0f769130fbc17c8ab0bc4f9aff79df08 Mon Sep 17 00:00:00 2001 From: solsta <segorov555@gmail.com> Date: Fri, 25 Mar 2022 12:28:19 +0000 Subject: [PATCH] Added pmdk transactions --- main.c | 99 +++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/main.c b/main.c index a9ee4ed..32fb250 100644 --- a/main.c +++ b/main.c @@ -6,6 +6,7 @@ #include <string.h> #include <malloc.h> #include <assert.h> +#include <libpmemobj/tx_base.h> typedef struct assert assert; struct ring_buffer{ @@ -30,6 +31,12 @@ struct cmd_and_next_index{ int next_index; }; +static void +log_stages(PMEMobjpool *pop_local, enum pobj_tx_stage stage, void *arg) +{ + /* Commenting this out because this is not required during normal execution. */ + /* dr_fprintf(STDERR, "cb stage: ", desc[stage], " "); */ +} unsigned int get_free_slots_left_in_the_buffer(struct ring_buffer *rb){ /* There is no scenario where this can happen other than on the first iteration */ @@ -48,7 +55,7 @@ void print_available_buffer_slots(struct ring_buffer *rb){ printf("There are %d free slot/s in the buffer\n", get_free_slots_left_in_the_buffer(rb)); } -int insert_in_to_buffer(struct ring_buffer *rb, char *command){ +int insert_in_to_buffer(PMEMobjpool *pop, struct ring_buffer *rb, char *command){ unsigned int command_size = strlen(command); int cmd_info_size = sizeof (struct varied_length_command_info); int required_length = command_size+cmd_info_size; @@ -71,36 +78,33 @@ int insert_in_to_buffer(struct ring_buffer *rb, char *command){ int reminder = required_length-remaining_space; void *addr_of_buffer = &rb->buffer[rb->head]; void *addr_of_entry = full_cmd; - memcpy(addr_of_buffer, addr_of_entry, remaining_space); + pmemobj_memcpy_persist(pop,addr_of_buffer, addr_of_entry, remaining_space); printf("First memcopy worked\n"); addr_of_buffer = &rb->buffer[0]; addr_of_entry+=remaining_space; - memcpy(addr_of_buffer, addr_of_entry, reminder); + pmemobj_memcpy_persist(pop,addr_of_buffer, addr_of_entry, reminder); printf("Second memcopy worked\n"); + pmemobj_tx_add_range_direct(&rb->head, sizeof (rb->head)); rb->head = reminder; free(full_cmd); return 0; } else { - printf("Normal case insertion\n"); - printf("Inserting %s\n", command); - /* TODO wrap inside a transaction */ + struct varied_length_command_info *cmd_info = malloc(sizeof (struct varied_length_command_info)); cmd_info->command_lenght = strlen(command); - - memcpy(&rb->buffer[rb->head], cmd_info, sizeof(struct varied_length_command_info)); - printf("Written command with lenght as : %d, head is currently at : %d\n", cmd_info->command_lenght, rb->head); + pmemobj_memcpy_persist(pop, &rb->buffer[rb->head], cmd_info, sizeof(struct varied_length_command_info)); void *rb_addr = &rb->buffer[rb->head]; char *command_slot = rb_addr + sizeof(struct varied_length_command_info); - memcpy(command_slot, command, strlen(command)+1); - printf("Written to slot: %s\n", command_slot); + pmemobj_memcpy_persist(pop,command_slot, command, strlen(command)+1); + pmemobj_tx_add_range_direct(&rb->head, sizeof(rb->head)); rb->head += sizeof(struct varied_length_command_info) + strlen(command); return 0; } } -int insert(struct ring_buffer *rb, char *command){ +int insert(PMEMobjpool *pop, struct ring_buffer *rb, char *command){ if(strlen (command) + sizeof (struct varied_length_command_info) >= rb->size){ printf("No space to insert this entry, dropping!\n"); return 1; @@ -109,17 +113,26 @@ int insert(struct ring_buffer *rb, char *command){ printf("No space to insert this entry, dropping!\n"); return 1; } - insert_in_to_buffer(rb, command); - rb->number_of_commands+=1; - return 0; + pmemobj_tx_begin(pop, NULL, TX_PARAM_CB, log_stages, NULL, + TX_PARAM_NONE); + insert_in_to_buffer(pop,rb, command); + pmemobj_tx_add_range_direct(&rb->number_of_commands, sizeof (rb->number_of_commands)); + rb->number_of_commands+=1; + pmemobj_tx_commit(); + pmemobj_tx_end(); + return 0; } -struct cmd_and_next_index *retrieve_command_at_index(struct ring_buffer *rb, int index){ +struct cmd_and_next_index *retrieve_command_at_index(PMEMobjpool *pop, struct ring_buffer *rb, int index){ if(rb->number_of_commands == 0){ printf("Buffer is empty\n"); return NULL; } + pmemobj_tx_begin(pop, NULL, TX_PARAM_CB, log_stages, NULL, + TX_PARAM_NONE); + pmemobj_tx_add_range_direct(&rb->number_of_commands, sizeof (rb->number_of_commands)); + pmemobj_tx_add_range_direct(&rb->tail, sizeof (rb->tail)); rb->number_of_commands-=1; //First check if current index fits the size of the info block int space_left_in_this_turn = rb->real_size-index; @@ -139,7 +152,6 @@ struct cmd_and_next_index *retrieve_command_at_index(struct ring_buffer *rb, int printf("Copying %d bytes to cmd_info\n", remainder); memcpy(cmd_addr_ptr, start_addr, remainder); printf("Command length according to cmd info: %d\n", cmd_info->command_lenght); - int cmd_l = cmd_info->command_lenght; void *cmd_start_addr = &rb->buffer[0]+remainder; char *extracted_command = malloc(cmd_info->command_lenght); struct cmd_and_next_index *data = malloc(sizeof (struct cmd_and_next_index)); @@ -149,6 +161,8 @@ struct cmd_and_next_index *retrieve_command_at_index(struct ring_buffer *rb, int memcpy(extracted_command, cmd_start_addr, cmd_info->command_lenght); free(cmd_info); rb->tail = index; + pmemobj_tx_commit(); + pmemobj_tx_end(); return data; } else { @@ -172,6 +186,8 @@ struct cmd_and_next_index *retrieve_command_at_index(struct ring_buffer *rb, int data->command = extracted_command; data->next_index = cmd_info->command_lenght-space_left_for_command; rb->tail = index; + pmemobj_tx_commit(); + pmemobj_tx_end(); return data; } else{ printf("Normal case extraction\n"); @@ -184,6 +200,8 @@ struct cmd_and_next_index *retrieve_command_at_index(struct ring_buffer *rb, int data->command = extracted_command; data->next_index = index+sizeof (struct varied_length_command_info) + cmd_info->command_lenght; rb->tail = index; + pmemobj_tx_commit(); + pmemobj_tx_end(); return data; } } @@ -222,19 +240,36 @@ void print_commands_in_the_buffer(struct ring_buffer *rb){ struct ring_buffer *initialise_ring_buffer(PMEMobjpool *pop){ struct ring_buffer *rb = initialise_ring_buffer_on_persistent_memory(pop); + pmemobj_tx_begin(pop, NULL, TX_PARAM_CB, log_stages, NULL, + TX_PARAM_NONE); + pmemobj_tx_add_range_direct(&rb->size, sizeof (rb->size)); + pmemobj_tx_add_range_direct(&rb->real_size, sizeof (rb->real_size)); + pmemobj_tx_add_range_direct(&rb->head, sizeof (rb->head)); + pmemobj_tx_add_range_direct(&rb->tail, sizeof (rb->tail)); + pmemobj_tx_add_range_direct(&rb->number_of_commands, sizeof (rb->number_of_commands)); + rb->size =sizeof(rb->buffer); rb->real_size = rb->size-1; rb->head = 0; rb->tail = 0; rb->number_of_commands = 0; printf("Initialized ring_buffer\n"); + pmemobj_tx_commit(); + pmemobj_tx_end(); return rb; } -void reset_ring_buffer(struct ring_buffer *rb){ +void reset_ring_buffer(PMEMobjpool *pop, struct ring_buffer *rb){ + pmemobj_tx_begin(pop, NULL, TX_PARAM_CB, log_stages, NULL, + TX_PARAM_NONE); + pmemobj_tx_add_range_direct(&rb->head, sizeof (rb->head)); + pmemobj_tx_add_range_direct(&rb->tail, sizeof (rb->tail)); + pmemobj_tx_add_range_direct(&rb->number_of_commands, sizeof (rb->number_of_commands)); rb->head = 0; rb->tail = 0; rb->number_of_commands = 0; + pmemobj_tx_commit(); + pmemobj_tx_end(); } int main() { @@ -247,22 +282,22 @@ int main() { struct cmd_and_next_index *cmd_data; /* This must fail because command is too long */ - assert( insert(rb, "foo bar foo bar foo bar foo bar") == 1); + assert( insert(pop, rb, "foo bar foo bar foo bar foo bar") == 1); /* Testing inserting more than buffer can accomodate for */ - assert(insert(rb, "abcdefghij") == 0); - assert(insert(rb, "abcdefghij") == 0); - assert(insert(rb, "abcdefghij") == 1); - reset_ring_buffer(rb); + assert(insert(pop,rb, "abcdefghij") == 0); + assert(insert(pop,rb, "abcdefghij") == 0); + assert(insert(pop,rb, "abcdefghij") == 1); + reset_ring_buffer(pop, rb); /* Testing a reset scenario */ - assert(insert(rb, "abcdef") == 0); - assert(insert(rb, "abcdef") == 0); - assert(insert(rb, "abcdef") == 1); - assert(insert(rb, "abcdefg") == 1); - assert(insert(rb, "abcde") == 0); + assert(insert(pop,rb, "abcdef") == 0); + assert(insert(pop,rb, "abcdef") == 0); + assert(insert(pop,rb, "abcdef") == 1); + assert(insert(pop,rb, "abcdefg") == 1); + assert(insert(pop,rb, "abcde") == 0); assert(rb->head == rb->real_size); - reset_ring_buffer(rb); + reset_ring_buffer(pop,rb); char seen_head_positions[rb->real_size]; char seen_tail_positions[rb->real_size]; @@ -276,14 +311,14 @@ int main() { seen_head_positions[rb->head] = '1'; seen_tail_positions[rb->tail] = '1'; - insert(rb, "foo bar"); + insert(pop,rb, "foo bar"); assert(rb->head < rb->size); assert(rb->tail < rb->size); seen_head_positions[rb->head] = '1'; seen_tail_positions[rb->tail] = '1'; - cmd_data = retrieve_command_at_index(rb, cmd_index); + cmd_data = retrieve_command_at_index(pop,rb, cmd_index); assert(strcmp(cmd_data->command, "foo bar") == 0); cmd_index = cmd_data->next_index; } @@ -294,7 +329,7 @@ int main() { } /* This must return NULL if the process tries to retrieve a command which does not exist */ - assert(retrieve_command_at_index(rb,cmd_index) == NULL); + assert(retrieve_command_at_index(pop, rb,cmd_index) == NULL); pmemobj_close(pop); -- GitLab