技术开发 频道

如何通过DB2和WebSphere使用协作事务

使用全局连接工厂和线程级别连接缓存

    如上面所述,当包含使用此 JDBC 访问方法的 Java Compute 节点的 Message Broker 流为了实现伸缩性而需要并发处理多个事务时,将使用其他实例对其进行部署,以提供线程池。即,将在 BAR File Editor 中设置 additionaInstances 属性来定义在池中可用的线程数量。在这种情况下,必须确保从 JDBC Type 2 驱动程序获得的 JDBC 连接句柄仅获取一次,并在池中每个可用线程处理的后续事务上进行重用。此处给出的解决方案通过两个阶段处理该需求:

  1. 提供了独立连接工厂来确保池中的每个线程仅连接一次。工厂类保留已连接的线程的静态映射,可以在线程进行调用时返回现有连接句柄,或创建并存储连接句柄。
  2. 我们将说明如何利用 Java ThreadLocal 来从连接工厂提供连接句柄的每个线程缓存,以避免每次各个线程处理新事务时锁定带来的开销。

    我们之所以需要这个两个级别的实现,目的是为了方便关闭连接。当删除 Broker 消息流时,或所属执行组结束时,Broker 并不会对池中处于活动状态的每个线程调用 termination 方法。因此,如果我们只有数据库连接句柄的 ThreadLocal 缓存,我们将不能发起对所有开放句柄的关闭操作。通过使用基础独立连接工厂,可使用其 onDelete() 方法来提供可关闭所有开放数据库句柄的点,并确保能够干净地关闭数据库管理器。

    独立连接工厂可以使用 Runnable 接口进行扩展,以提供基于非活动超时的连接关闭。但我们在代码中尚未实现此功能。

    首先,我们将讨论连接工厂独立类 DB2JdbcXaThreadConnectionFactory。提供了单个公共方法来允许线程获得在该线程上使用的连接句柄:

/** * Retrieve an existing connection for this thread to this DB2 instance or * create and store a new connection * * @param DB2DB_Name * @param DB2User_ID * @param DB2User_PASSWORD * @return * @throws DB2JdbcXaTestException */ public static Connection getConnection(String DB2DB_Name, String DB2User_ID, String DB2User_PASSWORD) throws DB2JdbcXaTestException { DB2JdbcXaThreadConnectionFactory instance = getInstance(); return instance.getCreateConnection(DB2DB_Name, DB2User_ID, DB2User_PASSWORD); }

    getInstance() 方法使用标准独立模式来检索或创建 DB2JdbcXaThreadConnectionFactory 类实例。 getCreateConnection() 方法使用静态哈希表来确定是否可以返回现有连接,或需要进行创建。

 

/** * Retrieve an existing connection for this thread to this DB2 instance or * create and store a new connection * * @param DB2DB_Name * @param user_ID * @param user_PASSWORD * @return Connection * @throws DB2JdbcXaTestException */ private Connection getCreateConnection(String DB2DB_Name, String user_ID, String user_PASSWORD) throws DB2JdbcXaTestException { Connection connObj = null; StringBuffer connectionKeyStrBuff = new StringBuffer(); connectionKeyStrBuff.append(Thread.currentThread().getName()); connectionKeyStrBuff.append('#'); connectionKeyStrBuff.append(DB2DB_Name); connectionKeyStrBuff.append('#'); connectionKeyStrBuff.append(user_ID); String connectionKeyString = connectionKeyStrBuff.toString(); synchronized (connections) { connObj = (Connection) connections.get(connectionKeyString); if (connObj == null) { if (isDebugEnabled) { System.out.println("Create Connection: " + connectionKeyString); } connObj = createConnection(DB2DB_Name, user_ID, user_PASSWORD); connections.put(connectionKeyString, connObj); } } return connObj; }

    如果尚不存在此数据和线程的连接,则将调用 createConnection() 方法来进行连接。

    接下来,考虑如何从 Java Compute 节点使用此连接工厂独立类 DB2JdbcXaThreadConnectionFactory 并缓存到 Java ThreadLocal 中。在 DB2JDBCXaJavaComputeBase 类中,通过扩展 MbJavaComputeNode 来实现 Broker Java Compute 节点。在此类中,将定义类 ThreadLocalConnection 来扩展 ThreadLocal 并实现 initialValue() 方法来从我们的连接工厂独立类 DB2JdbcXaThreadConnectionFactory 获取连接句柄。

// JCN Class object instance thread cache of the global DB2 instance // connection // obtained for each thread that executes the JCN in this flow private class ThreadLocalConnection extends ThreadLocal { public Object initialValue() { Connection jdbcXaConn = null; try { if (isDebugEnabled) { System.out .println("Obtain thread local connection for Flow " + flowName); } jdbcXaConn = DB2JdbcXaThreadConnectionFactory.getConnection( DB2DB_Name, DB2User_ID, DB2User_PASSWORD); } catch (DB2JdbcXaTestException e) { System.err.println("Failed getConnection(): " + e.toString()); e.printStackTrace(); noXaConnCause = e; } return jdbcXaConn; } }


    因此我们可以在每个消息处理调用 Java Compute 节点 DB2JDBCXaJavaComputeBase 类的 evaluate() 方法期间通过 ThreadLocal 对象的 get() 方法直接获得数据库连接句柄供使用,而这将在池中的每个线程的第一次调用时调用 initialValue() 方法:

/** * Obtain the DB2 JDBC Connection either from the local class threadlocal or * from the global DB2JdbcXaThreadConnectionFactory * * @return Connection * @throws DB2JdbcXaTestException */ public Connection getConnection() throws DB2JdbcXaTestException { Connection xaConn = (Connection) conn.get(); if (xaConn == null) { throw noXaConnCause; } return xaConn; }
0
相关文章