Manitou-Mail logo title

Source file: src/db.cpp

/* Copyright (C) 2004-2012 Daniel Verite

   This file is part of Manitou-Mail (see http://www.manitou-mail.org)

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License version 2 as
   published by the Free Software Foundation.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place - Suite 330,
   Boston, MA 02111-1307, USA.
*/

#include "main.h"

#include "db.h"
#include <stdio.h>
#include <iostream>

#include <QByteArray>
#include <QMessageBox>
#include <QSocketNotifier>

#include "database.h"
#include "sqlstream.h"
#include "sqlquery.h"
#include "addresses.h"
#include "db_listener.h"

//static PGconn *pgconn;
pgConnection pgDb;

PGconn* GETDB()
{
  //  return pgconn;
  return pgDb.connection();
}

void DBEXCPT(PGconn* c)
{
  //  std::cerr << PQerrorMessage(c);
  QString err=PQerrorMessage(c);
  QMessageBox::warning(NULL, QObject::tr("Database error"), err);
}

void DBEXCPT(db_excpt& p)
{
  //  std::cerr << p.query() << ":" << p.errmsg() << std::endl;
  QString err=p.query();
  err+=":\n";
  err+=p.errmsg();
  QMessageBox::warning(NULL, QObject::tr("Database error"), err);
}

db_excpt::db_excpt(const QString query,
		   const QString msg,
		   QString code/*=QString::null*/)
{
  m_query=query;
  m_err_msg=msg;
  m_err_code=code;
  DBG_PRINTF(3, "db_excpt: query='%s', err='%s'", m_query.toLocal8Bit().constData(), m_err_msg.toLocal8Bit().constData());
}

db_excpt::db_excpt(const QString query, db_cnx& d)
{
  m_query=query;
  char* pg_msg = PQerrorMessage(d.connection());
  if (pg_msg!=NULL)
    m_err_msg = QString::fromUtf8(pg_msg);
}

int
ConnectDb(const char* cnx_string, QString* errstr)
{
  try {
    pgDb.logon(cnx_string);
    db_cnx::set_connect_string(cnx_string);
    db_cnx db;
    sql_stream s("SELECT current_database()", db);
    if (!s.eos()) {
      QString dbname;
      s >> dbname;
      db_cnx::set_dbname(dbname);
    }
  }
  catch(db_excpt& p) {
    QByteArray errmsg_bytes = p.errmsg().toLocal8Bit();    
    std::cerr << errmsg_bytes.constData() << std::endl;
    if (errstr)
      *errstr = p.errmsg();
    return 0;
  }
  return 1;
}

void // static
db_cnx::disconnect_all()
{
  // close secondary connections
  std::list<db_cnx_elt*>::iterator it=m_cnx_list.begin();
  for (; it!=m_cnx_list.end(); it++) {
    if ((*it)->m_connected) {
      (*it)->m_db->logoff();
      (*it)->m_connected=false;
    }
  }
}

void
DisconnectDb()
{
  // close primary connection
  pgDb.logoff();
  db_cnx::disconnect_all();
}

database::database() : m_open_trans_count(0)
{
}

database::~database()
{
}

void database::begin_transaction()
{
  m_open_trans_count++;
}

void database::commit_transaction()
{
}

void database::rollback_transaction()
{
}

int database::open_transactions_count() const
{
  return m_open_trans_count;
}

void
pgConnection::add_listener(db_listener* listener)
{
  m_listeners.append(listener);
  QString s = "LISTEN " + listener->notification_name();
  QByteArray ba = s.toUtf8();
  PQexec(m_pgConn, ba.constData());
}

void
pgConnection::remove_listener(db_listener* listener)
{
  int i=m_listeners.indexOf(listener);
  if (i>=0) {
    QString s = "UNLISTEN " + listener->notification_name();
    QByteArray ba = s.toUtf8();
    PQexec(m_pgConn, ba.constData());
    m_listeners.removeAt(i);
  }
}

// static data members
bool db_cnx::m_initialized;
std::list<db_cnx_elt*> db_cnx::m_cnx_list;
QMutex db_cnx::m_mutex;
QString db_cnx::m_connect_string;
QString db_cnx::m_dbname;

// static
void
db_cnx::set_connect_string(const char* cnx)
{
  m_connect_string = cnx;
}

// static
void
db_cnx::set_dbname(const QString dbname)
{
  m_dbname = dbname;
}

// static
const QString&
db_cnx::dbname()
{
  return m_dbname;
}


/* idle(): Return false if at least one non-primary connection is in
   use, meaning that we're probably running a query in a sub-thread.
   Should be called to avoid hitting the db with multiple simultaneous
   queries whenever possible */
// static
bool
db_cnx::idle()
{
  // TODO: do we need to use m_mutex here?
  std::list<db_cnx_elt*>::iterator it=m_cnx_list.begin();
  for (; it!=m_cnx_list.end(); it++) {
    if (!(*it)->m_available)
      return false;
  }
  return true;
}


db_cnx::~db_cnx()
{
  if (m_cnx) {
    std::list<db_cnx_elt*>::iterator it=m_cnx_list.begin();
    for (; it!=m_cnx_list.end(); it++) {
      if ((*it)->m_db == m_cnx) {
	(*it)->m_available=true;
      }
    }
  }
}

db_cnx::db_cnx(bool other_thread)
{
  if (!other_thread) {
    // just use the main connection for the main thread
    m_cnx=&pgDb;
    return;
  }

  const int max_cnx=5;
  m_mutex.lock();
  if (!m_initialized) {
    for (int i=0; i<max_cnx; i++) {
      db_cnx_elt* p = new db_cnx_elt;
      m_cnx_list.push_back(p);
    }
    m_initialized=true;
  }

  std::list<db_cnx_elt*>::iterator it = m_cnx_list.begin();
  for (; it!=m_cnx_list.end(); it++) {
    if ((*it)->m_available) {
      pgConnection* p;
      if (!(*it)->m_connected) {
	p = new pgConnection;
	DBG_PRINTF(4, "Opening a new database connection");
	(*it)->m_db = p;
	p->logon(m_connect_string.toLocal8Bit().constData());
	(*it)->m_connected=true;
      }
      (*it)->m_available=false;
      m_cnx = dynamic_cast<pgConnection*>((*it)->m_db); // FIXME??
      break;
    }
  }
  m_mutex.unlock();

  m_alerts_enabled=true;

#if 0
  {
    // DEBUG
    QString state;
    int i=0;
    std::list<db_cnx_elt*>::iterator it2 = m_cnx_list.begin();
    for (; it2!=m_cnx_list.end(); ++it2,++i) {
      state.append(QString("\nconnection %1 connected:%2 available:%3").arg(i).arg((*it2)->m_connected).arg((*it2)->m_available));
    }
    DBG_PRINTF(3, "Connections: %s", state.toLocal8Bit().constData());
  }
#endif

  if (it==m_cnx_list.end()) {
    DBG_PRINTF(2, "No database connection found");
    throw db_excpt(NULL, QObject::tr("The %1 database connections are already in use.").arg(max_cnx));
  }
}


int
pgConnection::logon(const char* conninfo)
{
  m_pgConn = PQconnectdb(conninfo);
  if (!m_pgConn) {
    throw db_excpt("connect", "not enough memory");
  }
  DBG_PRINTF(5,"logon m_pgConn=0x%p", m_pgConn);
  if (PQstatus(m_pgConn) == CONNECTION_BAD) {
    throw db_excpt("connect", PQerrorMessage(m_pgConn));
  }

  /* If the user has set PGCLIENTENCODING in its environment, then we decide
     to do no translation behind the postgresql client layer, since we
     assume that the user knows what he's doing. In the future, we may
     decide for a fixed encoding (that would be unicode/utf8, most
     probably) and override PGCLIENTENCODING. */
  if (!getenv("PGCLIENTENCODING")) {
    PGresult* res=PQexec(m_pgConn, "SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname=current_database()");
    if (res && PQresultStatus(res)==PGRES_TUPLES_OK) {
      const char* enc=(const char*)PQgetvalue(res,0,0);
      // pgsql versions before 8.1 return 'UNICODE', >=8.1 return 'UTF8'
      // we keep UTF8
      if (!strcmp(enc,"UNICODE"))
	enc="UTF8";
      set_encoding(enc);
    }
    if (res)
      PQclear(res);
  }
  PQexec(m_pgConn, "SET standard_conforming_strings=on");

  if (this==&pgDb)
    m_notifier = new pg_notifier(this);
 
  return 1;
}

void
pgConnection::logoff()
{
  if (m_pgConn) {
    if (m_notifier)
      delete m_notifier;
    
    PQfinish(m_pgConn);
    m_pgConn=NULL;
  }
}

bool
pgConnection::reconnect()
{
  DBG_PRINTF(3, "pgConnection::reconnect()");
  if (m_pgConn) {
    PQreset(m_pgConn);
    if (PQstatus(m_pgConn)!=CONNECTION_OK)
      return false;
  }
  for (int i=0; i<m_listeners.size(); i++) {
    /* Reinitialize listeners. It is necessary if the db backend
       process went down, and if the socket changed */
    db_listener* l = m_listeners.at(i);
    QString stmt = "LISTEN " + l->notification_name();
    PQexec(m_pgConn, stmt.toUtf8().constData());
  }
  return true;
}

bool
pgConnection::ping()
{
  if (!m_pgConn)
    return false;

  if (open_transactions_count() > 0) {
    /* We don't test the connection when inside a transaction because
       if the transaction is in a unusable state, any SQL will fail */
    return true;
  }

  PGresult* res = PQexec(m_pgConn, "SELECT 1");
  if (res) {
    bool ret = (PQresultStatus(res)==PGRES_TUPLES_OK);
    PQclear(res);
    return ret;
  }
  else
    return false;
}

QString
pgConnection::escape_string_literal(const QString src)
{
  char local_buf[2048]="\0";
  char* to=local_buf;
  int error;
  QByteArray arr = src.toUtf8();
  size_t length = arr.size();
  if (length*4 > sizeof(local_buf)) {
    to = (char*)malloc(length*4);
  }
  PQescapeStringConn(m_pgConn, to, arr.constData(), length, &error);
  // TODO: throw an exception if an error occurred
  QString out = QString::fromUtf8(to);
  if (to!=local_buf)
    free(to);
  return out;
}

#if 0
db_transaction::db_transaction(database& db): m_commit_done(false)
{
  m_pDb=&db;
  db.begin_transaction();
}
#endif

db_transaction::db_transaction(db_cnx& db): m_commit_done(false)
{
  m_pDb=&db;
  db.begin_transaction();
}

db_transaction::~db_transaction()
{
  if (m_pDb->datab()->open_transactions_count()==1 && !m_commit_done)
    m_pDb->rollback_transaction();
//  m_pDb->datab()->end_transaction();
}

void
db_transaction::commit()
{
  m_commit_done=true;
  if (m_pDb->datab()->open_transactions_count()==1) {
    m_pDb->commit_transaction();
  }
}

void
db_transaction::rollback()
{
  if (m_pDb->datab()->open_transactions_count()==1) {
    m_pDb->rollback_transaction();
  }
}

void
database::end_transaction()
{
  m_open_trans_count--;
  DBG_PRINTF(7, "m_open_trans_count=%d", m_open_trans_count);
  if (m_open_trans_count<0) {
    fprintf(stderr, "Fatal error: m_open_trans_count<0\n");
    exit(1);
  }
}

bool
db_cnx::next_seq_val(const char* seqName, int* id)
{
  try {
    QString query = QString("SELECT nextval('%1')").arg(seqName);
    sql_stream s(query, *this);
    s >> *id;
  }
  catch(db_excpt& p) {
    DBEXCPT(p);
    return false;
  }
  return true;
}

bool
db_cnx::next_seq_val(const char* seqName, unsigned int* id)
{
  try {
    QString query = QString("SELECT nextval('%1')").arg(seqName);
    sql_stream s(query, *this);
    s >> *id;
  }
  catch(db_excpt& p) {
    DBEXCPT(p);
    return false;
  }
  return true;
}

void
db_cnx::begin_transaction()
{
  m_cnx->begin_transaction();
  // don't use nested transactions
  DBG_PRINTF(7,"open_transactions_count()=%d", datab()->open_transactions_count());
  if (datab()->open_transactions_count()==1) {
    sql_stream s("BEGIN", *this);
  }
}

void
db_cnx::commit_transaction()
{
  end_transaction();
  if (datab()->open_transactions_count()==0) {
    sql_stream s("COMMIT", *this);
  }
}

void
db_cnx::rollback_transaction()
{
  end_transaction();
  if (datab()->open_transactions_count()==0) {
    sql_stream s("ROLLBACK", *this);
  }
}

void
db_cnx::handle_exception(db_excpt& e)
{
  if (m_alerts_enabled) {
    DBEXCPT(e);
  }
}

bool
db_cnx::ping()
{
  return m_cnx->ping();
}

QString
db_cnx::escape_string_literal(const QString str)
{
  return m_cnx->escape_string_literal(str);
}

pg_notifier::pg_notifier(pgConnection* cnx)
{
  m_pgcnx = cnx;
  PGconn* c = cnx->connection();
  int socket = PQsocket(c);
  if (socket != -1) {
    DBG_PRINTF(6, "Instantiate a new QSocketNotifier on fd=%d", socket);
    m_socket_notifier = new QSocketNotifier(socket, QSocketNotifier::Read);
    connect(m_socket_notifier, SIGNAL(activated(int)), this, SLOT(process_notification()));
  }
  else {
    DBG_PRINTF(1, "could not get db socket: PQsocket returned -1");
  }  
}

pg_notifier::~pg_notifier()
{
  if (m_socket_notifier)
    delete m_socket_notifier;
}

void
pg_notifier::process_notification()
{
  PGconn* c = m_pgcnx->connection();
  DBG_PRINTF(6, "process_notification() on socket %d", PQsocket(c));
  int r=PQconsumeInput(c);
  if (r==0) {
    DBG_PRINTF(6, "PQconsumeInput returns 0");
  }
  else {
    PGnotify* n;
    while ((n=PQnotifies(c))!=NULL) {
      DBG_PRINTF(6, "received db notify for %s", n->relname);
      for (int i=0; i<m_pgcnx->m_listeners.count(); i++) {
	db_listener* l = m_pgcnx->m_listeners.at(i);
	if (n->relname == l->notification_name()) {
	  l->process_notification();
	}
      }
      PQfreemem(n);
    }
  }
}

std::list<QString>
ReferencesList(const QString &s)
{
  std::list<QString> l;
  uint nLen=s.length();
  uint nPos=0;

  while (nPos<nLen) {
    int nStart=s.indexOf('<', nPos);
    if (nStart!=-1) {
      int endPos=s.indexOf('>',nStart);
      if (endPos!=-1) {
	l.push_back(s.mid(nStart+1,endPos-nStart-1));
	nPos=endPos+1;
      }
      else
	nPos=nLen;
    }
    else
      nPos=nLen;
  }
  return l;
}



HTML source code generated by GNU Source-Highlight plus some custom post-processing

List of all available source files