/* * include/io/PagedFile.h * * Copyright (C) 2023 Douglas Rumbaugh * * All rights reserved. Published under the Modified BSD License. * */ #pragma once #define __GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include "util/types.h" #include "util/base.h" #define PF_COUNT_IO #ifdef PF_COUNT_IO #define INC_READ() de::pf_read_cnt++ #define INC_WRITE() de::pf_write_cnt++ #define RESET_IO_CNT() \ de::pf_read_cnt = 0; \ de::pf_write_cnt = 0 #else #define INC_READ() do {} while (0) #define INC_WRITE() do {} while (0) #define RESET_IO_CNT() do {} while (0) #endif namespace de { static thread_local size_t pf_read_cnt = 0; static thread_local size_t pf_write_cnt = 0; class PagedFileIterator; class PagedFile; static PagedFileIterator *create_pfile_iter(PagedFile *pfile, PageNum start_page=0, PageNum stop_page=0); class PagedFile { public: static PagedFile *create(const std::string fname, bool new_file=true) { auto flags = O_RDWR; mode_t mode = 0640; off_t size = 0; if (new_file) { flags |= O_CREAT | O_TRUNC; } int fd = open(fname.c_str(), flags, mode); if (fd == -1) { return nullptr; } if (new_file) { if(fallocate(fd, 0, 0, PAGE_SIZE)) { return nullptr; } size = PAGE_SIZE; } else { struct stat buf; if (fstat(fd, &buf) == -1) { return nullptr; } size = buf.st_size; } if (fd) { return new PagedFile(fd, fname, size, mode); } return nullptr; } ~PagedFile() { if (m_file_open) { close(m_fd); } } /* * Add new_page_count new pages to the file in bulk, and returns the * PageId of the first page in the new range. * * If the allocation fails, returns INVALID_PID. Also returns INVALID_PID * if bulk allocation is not supported by the implementation. This can be * iter1. checked via the supports_allocation method. */ PageNum allocate_pages(PageNum count=1) { PageNum new_first = get_page_count() + 1; size_t alloc_size = count * PAGE_SIZE; if (raw_allocate(alloc_size)) { return new_first; } return INVALID_PNUM; } /* * Reads data from the specified page into a buffer pointed to by * buffer_ptr. It is necessary for buffer_ptr to be parm::SECTOR_SIZE * aligned, and also for it to be large enough to accommodate * parm::PAGE_SIZE chars. If the read succeeds, returns 1. Otherwise * returns 0. The contents of the input buffer are undefined in the case of * an error. */ int read_page(PageNum pnum, char *buffer_ptr) { return (check_pnum(pnum)) ? raw_read(buffer_ptr, PAGE_SIZE, PagedFile::pnum_to_offset(pnum)) : 0; } /* * Reads several pages into associated buffers. It is necessary for the * buffer referred to by each pointer to be parm::SECTOR_SIZE aligned and * large enough to accommodate parm::PAGE_SIZE chars. If possible, * vectorized IO may be used to read adjacent pages. If the reads succeed, * returns 1. If a read fails, returns 0. The contents of all the buffers * are undefined in the case of an error. */ int read_pages(std::vector> pages) { if (pages.size() == 0) { return 0; } if (pages.size() == 1) { read_page(pages[0].first, pages[0].second); } std::sort(pages.begin(), pages.end()); PageNum range_start = pages[0].first; PageNum prev_pnum = range_start; std::vector buffers; buffers.push_back(pages[0].second); for (size_t i=1; i %s\n", m_fname.c_str(), new_fname.c_str()); perror("IN RENAME:"); assert(false); } m_fname = new_fname; } private: PagedFile(int fd, std::string fname, off_t size, mode_t mode) { m_file_open = true; m_fd = fd; m_fname = fname; m_size = size; m_mode = mode; m_flags = O_RDWR | O_DIRECT; } static off_t pnum_to_offset(PageNum pnum) { return pnum * PAGE_SIZE; } bool check_pnum(PageNum pnum) const { return pnum != INVALID_PNUM && pnum < (get_file_size() / PAGE_SIZE); } int raw_read(char *buffer, off_t amount, off_t offset) { if (!verify_io_parms(amount, offset)) { return 0; } if (pread(m_fd, buffer, amount, offset) != amount) { return 0; } INC_READ(); return 1; } int raw_readv(std::vector buffers, off_t buffer_size, off_t initial_offset) { size_t buffer_cnt = buffers.size(); off_t amount = buffer_size * buffer_cnt; if (!verify_io_parms(amount, initial_offset)) { return 0; } auto iov = new iovec[buffer_cnt]; for (size_t i=0; i m_size) { return false; } if (amount % SECTOR_SIZE != 0) { return false; } if (offset % SECTOR_SIZE != 0) { return false; } return true; } int m_fd; bool m_file_open; off_t m_size; mode_t m_mode; std::string m_fname; int m_flags; }; class PagedFileIterator { public: PagedFileIterator(PagedFile *pfile, PageNum start_page = 0, PageNum stop_page = 0) : m_pfile(pfile), m_current_pnum((start_page == INVALID_PNUM) ? 0 : start_page - 1), m_start_pnum(start_page), m_stop_pnum(stop_page), m_buffer((char *)aligned_alloc(SECTOR_SIZE, PAGE_SIZE)) {} bool next() { while (m_current_pnum < m_stop_pnum) { if (m_pfile->read_page(++m_current_pnum, m_buffer)) { return true; } // IO error of some kind return false; } // no more pages to read return false; } char *get_item() { return m_buffer; } ~PagedFileIterator() { free(m_buffer); } private: PagedFile *m_pfile; PageNum m_current_pnum; PageNum m_start_pnum; PageNum m_stop_pnum; char *m_buffer; }; static PagedFileIterator *create_pfile_iter(PagedFile *pfile, PageNum start_page, PageNum stop_page) { return new PagedFileIterator(pfile, start_page, stop_page); } }