/** ** Copyright (c) 2007-2010 Illumina, Inc. ** ** This software is covered by the "Illumina Genome Analyzer Software ** License Agreement" and the "Illumina Source Code License Agreement", ** and certain third party copyright/licenses, and any user of this ** source file is bound by the terms therein (see accompanying files ** Illumina_Genome_Analyzer_Software_License_Agreement.pdf and ** Illumina_Source_Code_License_Agreement.pdf and third party ** copyright/license notices). ** ** This file is part of the Consensus Assessment of Sequence And VAriation ** (CASAVA) software package. ** ** \file lib/common/File_Buffer_Impl.cpp ** ** \brief Implementation of the file buffer ** ** Implementation of the file buffer ** ** \author Richard Shaw **/ #include #include #include #include #include #include #include "config.h" #include "common/StringUtil.h" #include "File_Buffer_Impl.h" /*****************************************************************************/ bool open_file(File_Hndl& file_hndl, const std::string& file_path_str, const File_Hndl::File_Use& file_use) { bool ret_val(true); int bz_error(0); file_hndl.my_file_use = file_use; switch (file_hndl.my_compression_type) { case File_Buffer::compressionNone: ret_val = ((file_hndl.my_file_hndl = fopen(file_path_str.c_str(), ((File_Hndl::READ == file_use) ? "r" : "w"))) != 0); break; case File_Buffer::compressionGzip: #if defined(HAVE_ZLIB) file_hndl.my_gz_file_hndl = gzopen(file_path_str.c_str(), ((File_Hndl::READ == file_use) ? "rb" : "wb")); ret_val = (file_hndl.my_gz_file_hndl != 0); #else std::cerr << "ERROR: Attempting to open a gzip file in program built without zlib" << std::endl; return false; #endif break; case File_Buffer::compressionBzip2: #if defined(HAVE_BZLIB) file_hndl.my_file_hndl = fopen(file_path_str.c_str(), ((File_Hndl::READ == file_use) ? "r" : "w")); if (!(ret_val = (file_hndl.my_file_hndl != 0))) { break; } file_hndl.my_bz2_file_hndl = ((File_Hndl::READ == file_use) ? BZ2_bzReadOpen(&bz_error, file_hndl.my_file_hndl, 0, 0, NULL, 0) : BZ2_bzWriteOpen(&bz_error, file_hndl.my_file_hndl, 9, // blockSize100k 0, // verbosity 30)); // workFactor if (!(ret_val = (bz_error == BZ_OK))) { if (File_Hndl::READ == file_use) { BZ2_bzReadClose(&bz_error, file_hndl.my_bz2_file_hndl); } else { BZ2_bzWriteClose(&bz_error, file_hndl.my_bz2_file_hndl, 0, // abandon 0, // n_bytes_in_ptr 0); // n_bytes_out_ptr } fclose(file_hndl.my_file_hndl); break; } #else std::cerr << "ERROR: Attempting to open a bzip2 file in program built without bzlib" << std::endl; return false; #endif break; } return ret_val; } /*****************************************************************************/ bool close_file(File_Hndl& file_hndl) { bool ret_val(true); int bz_error(0); switch (file_hndl.my_compression_type) { case File_Buffer::compressionNone: ret_val = (fclose(file_hndl.my_file_hndl) == 0); break; case File_Buffer::compressionGzip: #if defined(HAVE_ZLIB) ret_val = (gzclose(file_hndl.my_gz_file_hndl) == 0); #else return false; #endif break; case File_Buffer::compressionBzip2: #if defined(HAVE_BZLIB) if (File_Hndl::READ == file_hndl.my_file_use) { BZ2_bzReadClose(&bz_error, file_hndl.my_bz2_file_hndl); } else { BZ2_bzWriteClose(&bz_error, file_hndl.my_bz2_file_hndl, 0, // abandon 0, // n_bytes_in_ptr 0); // n_bytes_out_ptr } ret_val = (bz_error == BZ_OK); if (!ret_val) { std::cerr << "ERROR: bz_error was not BZ_OK after close" << std::endl; } // Do the fclose even if BZ2_bzClose has failed. if (fclose(file_hndl.my_file_hndl) != 0) { std::cerr << "ERROR: Failed to close file hndl used for bz2 " << strerror(errno) << "." << std::endl; ret_val = false; } #else return false; #endif break; } return ret_val; } /*****************************************************************************/ // `char* buf_ptr' not `const char* buf_ptr' because the underlying // gzwrite and BZ2_bzWrite want a non-const buffer. unsigned int write_chunk(File_Hndl& file_hndl, char* buf_ptr, const unsigned int chunk_size) { assert(File_Hndl::READ != file_hndl.my_file_use); if (File_Hndl::READ == file_hndl.my_file_use) { return 0; } unsigned int num_items_written(0); switch (file_hndl.my_compression_type) { case File_Buffer::compressionNone: num_items_written = (unsigned int) fwrite(buf_ptr, sizeof(char), (size_t)(chunk_size), file_hndl.my_file_hndl); break; case File_Buffer::compressionGzip: #if defined(HAVE_ZLIB) { const int gz_ret_val = gzwrite(file_hndl.my_gz_file_hndl, buf_ptr, chunk_size); // may be 0 indicating error. num_items_written = static_cast(gz_ret_val); } #else return 0; #endif break; case File_Buffer::compressionBzip2: #if defined(HAVE_BZLIB) { int bz2_error(BZ_OK); BZ2_bzWrite(&bz2_error, file_hndl.my_bz2_file_hndl, buf_ptr, chunk_size); switch (bz2_error) { case BZ_OK: // BZ2_bzWrite does not return any count. num_items_written = chunk_size; break; default: // error num_items_written = 0; } } #else return 0; #endif break; } return num_items_written; } /*****************************************************************************/ unsigned int read_chunk(File_Hndl& file_hndl, char* buf_ptr, const unsigned int chunk_size, bool& at_eof) { assert(File_Hndl::WRITE != file_hndl.my_file_use); if (File_Hndl::WRITE == file_hndl.my_file_use) { return 0; } unsigned int num_items_read(0); at_eof = false; switch (file_hndl.my_compression_type) { case File_Buffer::compressionNone: num_items_read = (unsigned int) fread(buf_ptr, sizeof(char), (size_t)(chunk_size), file_hndl.my_file_hndl); if (num_items_read < chunk_size) { at_eof = feof(file_hndl.my_file_hndl); } break; case File_Buffer::compressionGzip: #if defined(HAVE_ZLIB) { const int gz_ret_val = gzread(file_hndl.my_gz_file_hndl, buf_ptr, chunk_size); if (gz_ret_val < 0) { // error num_items_read = 0; } else if (gz_ret_val == 0) { at_eof = true; num_items_read = 0; } else { num_items_read = static_cast(gz_ret_val); } } #else return 0; #endif break; case File_Buffer::compressionBzip2: #if defined(HAVE_BZLIB) { int bz2_error(BZ_OK); int bz2_ret_val = BZ2_bzRead(&bz2_error, file_hndl.my_bz2_file_hndl, buf_ptr, chunk_size); switch (bz2_error) { case BZ_OK: num_items_read = static_cast(bz2_ret_val); break; case BZ_STREAM_END: at_eof = true; num_items_read = static_cast(bz2_ret_val); break; default: // error num_items_read = 0; } } #else return 0; #endif break; } *(buf_ptr + num_items_read) = '\0'; return num_items_read; } /*****************************************************************************/ File_Buffer_Impl::File_Buffer_Impl(const std::string& file_path_str, File_Buffer::Compression_Type compression_type) : my_file_path_str(file_path_str), my_file_is_open(false), my_read_line_citer(my_read_line_vec.end()), my_eof_is_pending(false) { my_file_hndl.my_compression_type = compression_type; } /*****************************************************************************/ File_Buffer_Impl::~File_Buffer_Impl() { if (my_file_is_open) { close_file(my_file_hndl); my_file_is_open = false; } } /*****************************************************************************/ bool File_Buffer_Impl::write_some_lines(const File_Buffer::Line_Vec& line_vec) { if (my_file_hndl.my_file_use == File_Hndl::READ) { std::cerr << "ERROR: Attempted to write lines to file " << my_file_path_str << " opened for reading." << std::endl; return false; } if (!my_file_is_open) { if (!(my_file_is_open = open_file(my_file_hndl, my_file_path_str, File_Hndl::WRITE))) { std::cerr << "ERROR: Failed to open " << my_file_path_str << " for writing." << std::endl; return false; } } // Work out the required buffer size and allocate it. unsigned int chunk_size(0); for (File_Buffer::Line_Vec_CIter line_citer = line_vec.begin(); line_citer != line_vec.end(); ++line_citer) { // The extra 1 is for the NL (assumed Unix). chunk_size += (line_citer->size() + 1); } char* buf_ptr = new char[chunk_size]; if (!buf_ptr) { std::cerr << "ERROR: Out of memory for buffering output to " << my_file_path_str << std::endl; return false; } // Copy the lines into the buffer. char* ch_ptr(buf_ptr); for (File_Buffer::Line_Vec_CIter line_citer = line_vec.begin(); line_citer != line_vec.end(); ++line_citer) { const unsigned int curr_line_len(line_citer->size()); strncpy(ch_ptr, line_citer->c_str(), curr_line_len); ch_ptr += curr_line_len; // Assumes Unix NL. *ch_ptr++ = '\n'; } // Write the buffer. const unsigned int num_bytes_written = write_chunk(my_file_hndl, buf_ptr, chunk_size); // Free the buffer. // FIXME : keep memory allocated between calls and do not free until // end_file? delete[] buf_ptr; if (num_bytes_written < chunk_size) { std::cerr << "ERROR: Wrote only " << num_bytes_written << " bytes of " << chunk_size << " byte buffer to " << my_file_path_str << std::endl; } return (num_bytes_written == chunk_size); } /*****************************************************************************/ bool File_Buffer_Impl::write_buffer(char* buf_ptr, unsigned int num_bytes) { if (my_file_hndl.my_file_use == File_Hndl::READ) { std::cerr << "ERROR: Attempted to write buffer to file " << my_file_path_str << " opened for reading." << std::endl; return false; } if (!my_file_is_open) { if (!(my_file_is_open = open_file(my_file_hndl, my_file_path_str, File_Hndl::WRITE))) { std::cerr << "ERROR: Failed to open " << my_file_path_str << " for writing." << std::endl; return false; } } return (write_chunk(my_file_hndl, buf_ptr, num_bytes) == num_bytes); } /*****************************************************************************/ bool File_Buffer_Impl::end_file() { if (my_file_hndl.my_file_use == File_Hndl::READ) { std::cerr << "ERROR: Attempted to end writing to file " << my_file_path_str << " opened for reading." << std::endl; return false; } if (!my_file_is_open) { std::cerr << "ERROR: Attempted to end file " << my_file_path_str << " that is not open." << std::endl; return false; } if (!close_file(my_file_hndl)) { std::cerr << "ERROR: Failed to close file " << my_file_path_str << std::endl; // Set flag to false even if close is not fully successful. my_file_is_open = false; return false; } my_file_is_open = false; return true; } /*****************************************************************************/ bool File_Buffer_Impl::read_some_lines(File_Buffer::Line_Vec& line_vec, bool& at_eof) { if (my_file_hndl.my_file_use == File_Hndl::WRITE) { std::cerr << "ERROR: Attempted to read from file " << my_file_path_str << " opened for writing." << std::endl; return false; } // Just in case someone tries mixing read_line and read_some_lines. if (!my_read_line_vec.empty()) { line_vec = my_read_line_vec; my_read_line_vec.clear(); my_read_line_citer = my_read_line_vec.end(); at_eof = my_eof_is_pending; my_eof_is_pending = false; return true; } // Onto the main body. line_vec.clear(); // FIXME : Keep allocated and track count of lines. at_eof = false; if (!my_file_is_open) { if (!(my_file_is_open = open_file(my_file_hndl, my_file_path_str, File_Hndl::READ))) { std::cerr << "ERROR: Failed to open " << my_file_path_str << std::endl; return false; } } const unsigned int chunk_size(1000000); char buf[chunk_size + 1]; char* buf_ptr = &buf[0]; // Put any characters left over from last time back into the buffer. const unsigned int num_prefix_bytes(my_left_over_str.size()); strncpy(buf_ptr, my_left_over_str.c_str(), num_prefix_bytes); // my_left_over_str is cleared below after another potential use. const unsigned int num_bytes_to_read(chunk_size - num_prefix_bytes); const unsigned int num_bytes_read = read_chunk(my_file_hndl, buf_ptr + num_prefix_bytes, num_bytes_to_read, at_eof); if (at_eof || (num_bytes_read == 0)) { close_file(my_file_hndl); my_file_is_open = false; // whether the close succeeds or not } if (num_bytes_read == 0) { // Either EOF was hit (in which case at_eof should already have been // set true) or there was a real error (at_eof will be false). if (at_eof) { // No bytes were read this time because we hit EOF but a line // (for some reason without a terminating NL) was left over // from the last read. if (!my_left_over_str.empty()) { line_vec.push_back(my_left_over_str); my_left_over_str.clear(); return true; } } return false; } // Clear here after potential EOF use above. my_left_over_str.clear(); // Until EOF is reached, assume that if the last character read is // not NL, that the last line will be partial and needs to be cached // rather than returned. const bool have_bytes_left_over(*(buf_ptr + num_prefix_bytes + num_bytes_read - 1) != '\n'); typedef std::vector Cstr_Vec; typedef Cstr_Vec::const_iterator Cstr_Vec_CIter; Cstr_Vec line_cstr_vec; fastTokenizerInPlace(buf_ptr, '\n', line_cstr_vec); if (have_bytes_left_over) { my_left_over_str = *(line_cstr_vec.rbegin()); line_cstr_vec.pop_back(); } // Convert pointers into temporary buffer to STL strings. // FIXME : Would also need to be changed if line_vec was kept allocated. for (Cstr_Vec_CIter cstr_citer = line_cstr_vec.begin(); cstr_citer != line_cstr_vec.end(); ++cstr_citer) { line_vec.push_back(*cstr_citer); } return true; } /*****************************************************************************/ bool File_Buffer_Impl::read_line(std::string& line_str, bool& at_eof) { bool ret_val(true); at_eof = false; bool need_new_vec(false); if (my_read_line_vec.empty()) { need_new_vec = true; } else if (my_read_line_citer == my_read_line_vec.end()) { my_read_line_vec.clear(); need_new_vec = true; } if (need_new_vec) { // Nothing buffered, so attempt a read. if (!read_some_lines(my_read_line_vec, my_eof_is_pending)) { at_eof = my_eof_is_pending; my_read_line_citer = my_read_line_vec.end(); return false; } my_read_line_citer = my_read_line_vec.begin(); } line_str = *(my_read_line_citer++); if (my_read_line_citer == my_read_line_vec.end()) { at_eof = my_eof_is_pending; } return ret_val; } /*****************************************************************************/