Implementing a simple database semaphore

In this article I will describe a simple technique that enables you to implement an “exclusive lock” in a distributed, even clustered, environment.

The technique works with all major databases, and should work with any database that has a reasonable implementation of the isolation level “READ COMMITTED”.

It works with PostgreSQL, DB2, Oracle and Apache Derby (JavaDB) and possibly others.

The technique can be used to serialize access to a specific entity or object graph in a system, where several application servers share a common database.

I saw this feature implemented the first time in a system that received about a million XML documents over a couple of hours. Each document had to be transformed and processed and corresponding database tables updated.

Three application servers were dedicated to the task of handling the incoming documents.

Each document had a unique ID that was generated from certain key information contained within the document.

Incoming documents were complete in that it would contain a “recent” copy of all document information. If the document were not registered, it would be created. If the document had a newer timestamp than the registered, it would be updated.

The same document could enter the system in different versions at any point in time; there was no ordering etc.

So a mechanism was needed to ensure, that a specific document was not handled by more than one transaction at a time.

An “exclusive lock” was needed, and it had to work with documents that were not registered in the system yet.

How

All you need is a table with a unique index and transactions with isolation level “READ COMMITTED” or higher.

The transactions need not be J[2]EE CMT or anything fancy; just plain old JDBC connections with autocommit=false can do the trick.

In this sample we create a business object lock – an exclusive lock that can be put on any business object. A business object is identified by the name of the entity (e.g. “BondBO” for the Bond Business Object) and the object’s unique ID (e.g, the Bond’s ISIN): “BondBO:DK0015966592” (Note that DK0015966592 is not actually an ISIN of a Bond).

Below is DDL for a business object lock table:

CREATE TABLE businessobjectlock
(
  uuid VARCHAR(32) NOT NULL,
  id VARCHAR(80) NOT NULL,
  locked_ts TIMESTAMP NOT NULL,
  CONSTRAINT pk_businessobjectlock PRIMARY KEY (uuid),
  CONSTRAINT id_businessobjectlock UNIQUE (id)
);

When one transaction (tx-A) inserts a row in this table with a specific id (the business object ID), other transactions trying to insert a row with the same id must wait, until tx-A either commits or rolls-back.

The reason is the isolation level. When set at “READ COMMITED”, any other transaction must not know anything about what tx-A is doing until the transaction is finished. Even if tx-A deletes the same row before it terminates.

If the transaction first inserts and immediately thereafter deletes the row, we can ensure that the locks are not “lost”:

  • If the entire transaction commits, both the insert and the delete are committed and nothing is left behind.
  • If the entire transaction rolls-back, both the insert and the delete are rolled back and nothing is left behind.

JAVA low-level JDBC pseudo code:

Connection con = openConnection();
con.setAutoCommit(false);
con.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
pInsert = con.prepareStatement(SQL_INSERT);
pInsert.setString(1, uuid);
pInsert.setString(2, objId);
pInsert.executeUpdate();
pDelete = con.prepareStatement(SQL_DELETE);
pDelete.setString(1, uuid);
pDelete.executeUpdate();
// do other stuff - work on object graph
con.commit();

JAVA using JPA (transaction handling done elsewhere or using CMT):

EntityManager em = ...;
// BusinessObjectLock is a persistent object
BusinessObjectLock lock = new BusinessObjectLock(objId);
em.persist(lock);
// Note: some JPA implementations are clever enough to just do nothing if the persist is followed directly by remove
em.flush();
em.remove(lock);
em.flush();

The flush() fires the insert and the delete against the database, possibly waiting if another transaction is doing the same insert with the same object-ID. Nothing is actually committed until later.

The sample code shown below demonstrates how 3 threads wait nicely for each other trying to get the same lock. The sample is using a PostgreSQL database, but it could be any database implementing the READ-COMMITTED isolation level:

package dk.judby.test.pgsql;
 
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
 
public class ConcurrentTest {
/*
 
Table businessobjectlock...
 
CREATE TABLE businessobjectlock
(
  uuid VARCHAR(32) NOT NULL,
  id VARCHAR(80) NOT NULL,
  locked_ts TIMESTAMP NOT NULL,
  CONSTRAINT pk_businessobjectlock PRIMARY KEY (uuid),
  CONSTRAINT id_businessobjectlock UNIQUE (id)
);
 
*/    
    public static final String DB_URL = "jdbc:postgresql:testdb";
    public static final String DB_USER = "test";
    public static final String DB_PASSWORD = "password";
    public static final String DB_DRIVER_CLASS = "org.postgresql.Driver";
 
    public static final String SQL_INSERT = "INSERT INTO businessobjectlock (uuid,id,locked_ts) VALUES (?,?,?)";
    public static final String SQL_DELETE = "DELETE FROM businessobjectlock WHERE uuid=?";
 
    static final String ID = "BondBO:DK0015966592";
 
    private static long nsStart;
 
    private static AtomicInteger atomicSeq = new AtomicInteger(0);
 
    static class LogItem {
        final int seq = atomicSeq.incrementAndGet();
        final int threadId;
        final long msTs;
        final String message;
        /**
         * @param threadId
         * @param msTs
         * @param message
         */
        LogItem(final int threadId, final long msTs, final String message) {
            super();
            this.threadId = threadId;
            this.msTs = msTs;
            this.message = message;
        }        
    };
 
    private static List<LogItem> log = new ArrayList<LogItem>();
 
    static void log(final int threadId, final String message) {
        LogItem li = new LogItem(threadId, System.nanoTime(), message);
        synchronized (log) {
            log.add(li);
        }
    }
 
    static class Runner extends Thread {
        final int id;
        final Object lock;
        final Connection con;
 
        /**
         * @param id
         * @param lock
         * @param con
         */
        Runner(final int id, final Object lock, final Connection con) {
            super();
            this.id = id;
            this.lock = lock;
            this.con = con;
            this.start();
        }
 
        public void run() {
            PreparedStatement psInsert = null;
            PreparedStatement psDelete = null;
            String uuid = generateUUID();
            try {
                synchronized (lock) {
                    lock.wait();
                }
                log(this.id, "Started");
 
                psInsert = con.prepareStatement(SQL_INSERT);
                psDelete = con.prepareStatement(SQL_DELETE);
                log(this.id, "Statements prepared, doing insert");
                psInsert.setString(1, uuid);
                psInsert.setString(2, ID);
                psInsert.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
                int ins = psInsert.executeUpdate();
                log(this.id, "rows inserted:"+ins);
 
                Thread.sleep(50);
 
                psDelete.setString(1, uuid);
                int del = psDelete.executeUpdate();
                log(this.id, "rows deleted:"+del);
 
                Thread.sleep(250);
            } catch (Exception e) {
                log(this.id, "Caught exception: "+e.toString());
                try {con.rollback();} catch (Exception ee) {ee.printStackTrace();}
            } finally {
                log(this.id, "doing commit...");
                try {con.commit();} catch (Exception e) {e.printStackTrace();}
                log(this.id, "commit done...");
                if (psInsert != null) {try { psInsert.close();} catch (Exception ignored){}}
                if (psDelete != null) {try { psDelete.close();} catch (Exception ignored){}}
                log(this.id, "done...");
            }
        }
    };
 
    public static void main(String[] args) {
        Connection con1 = null;
        Connection con2 = null;
        Connection con3 = null;
        Connection con4 = null;
 
        try {
            Class.forName(DB_DRIVER_CLASS).newInstance();
 
            nsStart = System.nanoTime();
            log(0, "Started...");
 
            con1 = prepareConnection();
 
            con1.createStatement().executeUpdate("delete from businessobjectlock");
            con1.commit();
            con1.close();
 
            con2 = prepareConnection();
 
            con3 = prepareConnection();
 
            con4 = prepareConnection();
 
            Object lock = new Object();
            Runner run1 = new Runner(1, lock, con2);
            Runner run2 = new Runner(2, lock, con3);
            Runner run3 = new Runner(3, lock, con4);
 
            log(0, "Threads created");
            Thread.sleep(250);
 
            synchronized (lock) {
                lock.notifyAll();
            }
            log(0, "Threads running - waiting...");
            run1.join();
            run2.join();
            run3.join();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (con2 != null) try { con2.close(); } catch (Exception ignored) {} 
            if (con3 != null) try { con3.close(); } catch (Exception ignored) {}
            if (con4 != null) try { con4.close(); } catch (Exception ignored) {}
            log(0, "Done !");
            printLog(System.out);
        }
    }
 
    public static String generateUUID() {
        char[] chars = new char[32];
        UUID uuid = UUID.randomUUID();
        String s = uuid.toString();
        s.getChars(0, 8, chars, 0);
        s.getChars(9, 13, chars, 8);
        s.getChars(14, 18, chars, 12);
        s.getChars(19, 23, chars, 16);
        s.getChars(24, 36, chars, 20);
        return new String(chars);
    }
 
    private static void printLog(PrintStream out) {
        List<LogItem> list = new ArrayList<LogItem>(log);
        Collections.sort(list, new Comparator<LogItem>() {
            public int compare(LogItem o1, LogItem o2) {
                return Long.valueOf(o1.seq).compareTo(Long.valueOf(o2.seq));
            }
        });
        for (LogItem item : list) {
            out.print((item.msTs - nsStart)/1000000L);
            out.print("\t");
            out.print(item.threadId);
            out.print("\t");
            out.println(item.message);
        }
    }
 
    /**
     * @return
     * @throws SQLException
     */
    private static Connection prepareConnection() throws SQLException {
        Connection con1;
        con1 = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
 
        Timestamp ts = testConnection(con1);
        log(0, "Test connection: "+ts);
 
        con1.setAutoCommit(false);
        return con1;
    }
 
    public static Timestamp testConnection(Connection con) throws SQLException {
        ResultSet rs = null;
        try {
            rs = con.createStatement().executeQuery("select now()");
            if (rs.next()) {
                return rs.getTimestamp(1);
            } else {
                return null;
            }
        } finally {
            rs.close();
        }
    }
}

Note that logging is not flushed until all threads are done, in order not do disturb the threads/transactions interactions. From the log it is obvious that whenever a thread (transaction) commits, another thread (transaction) takes over:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1	0	Started...
215	0	Test connection: 2010-02-06 21:19:10.863
265	0	Test connection: 2010-02-06 21:19:10.926
307	0	Test connection: 2010-02-06 21:19:10.973
348	0	Test connection: 2010-02-06 21:19:11.019
350	0	Threads created
593	0	Threads running - waiting...
593	1	Started
593	3	Started
593	2	Started
594	3	Statements prepared, doing insert
594	1	Statements prepared, doing insert
594	2	Statements prepared, doing insert
601	1	rows inserted:1
659	1	rows deleted:1
905	1	doing commit...
920	1	commit done...
920	2	rows inserted:1
920	1	done...
971	2	rows deleted:1
1218	2	doing commit...
1223	2	commit done...
1223	2	done...
1223	3	rows inserted:1
1284	3	rows deleted:1
1530	3	doing commit...
1536	3	commit done...
1536	3	done...
1537	0	Done !

In line 17 thread #1 commits and immediately thread #2 takes over. In line 22 thread #2 commits and immediately thread #3 takes over.

We have effectively created an exclusive lock, where transactions wait nicely on each other before doing its business.

NOTE: although this mechanism is excellent as a lock-mechanism, don’t overuse it. If you lock on more than one entity in your system, you can get into situations where different threads try to acquire different locks in different order, and you have a DEADLOCK.

Updated: 2011-08-15 18:24: I noticed today that the eclipselink JPA implemetation (2.1.3) that comes with WLS 10.3.5 needs a flush() between persist() and remove(), otherwise it does no DML and the trick does not work 🙂

About Jesper Udby

I'm a freelance computer Geek living in Denmark with my wife and 3 kids. I've done professional software development since 1994 and JAVA development since 1998.
This entry was posted in Databases, Java and tagged , , , , , , , , . Bookmark the permalink.

11 Responses to Implementing a simple database semaphore

Leave a Reply to RajeshKishore Cancel reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.