Tuesday, May 12, 2015

Customized HA-ready JavaEE timer to invoke EJB methods with Tx support

In my application, I need an non-persistent EJB timer to trigger my EJB methods as scheduled.

JBOSS AS 7.1, however, seems to have an issue/bug in supporting non-persistent ejb timers: JBOSS always treat timer as persistent, even though the timer's been configured as non-persistent. This undesired behaviour causes JBOSS to invoke timer seemingly-endlessly right after the server got restarted.

To overcome this, I implement my own timer to trigger EJB methods.
Multiple JBOSS AS, each hosts

  • Timer: EmailProcessorInvoker
  • Singleton EJBEmailProcessorSB


In this setup, 
  • ejb hosts core business logic (processing incoming emails from exchange server)
  • ejb email processing logic is triggered by the timer
  • timer works in its own thread (started by ejb:postconstruct )
  • timer uses JGroups to form a cluster of timers and only cluster leader works as active timer to trigger its local EJB's email processing logic
  • timer MUST hold EJB's business object to be able to use JavaEE container's Tx support (line#82)
Sample Java:


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package jwang.poc.ejbs;

import static javax.ejb.TransactionAttributeType.REQUIRES_NEW;

import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Timestamp;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.SessionContext;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType;

import microsoft.exchange.webservices.data.core.ExchangeService;
import microsoft.exchange.webservices.data.core.PropertySet;
import microsoft.exchange.webservices.data.core.service.item.Item;
import microsoft.exchange.webservices.data.core.service.schema.EmailMessageSchema;
import microsoft.exchange.webservices.data.core.service.schema.ItemSchema;
import microsoft.exchange.webservices.data.credential.ExchangeCredentials;
import microsoft.exchange.webservices.data.credential.WebCredentials;
import microsoft.exchange.webservices.data.enumeration.BasePropertySet;
import microsoft.exchange.webservices.data.enumeration.ExchangeVersion;
import microsoft.exchange.webservices.data.enumeration.LogicalOperator;
import microsoft.exchange.webservices.data.enumeration.SortDirection;
import microsoft.exchange.webservices.data.enumeration.WellKnownFolderName;
import microsoft.exchange.webservices.data.property.complex.AttachmentCollection;
import microsoft.exchange.webservices.data.property.complex.FileAttachment;
import microsoft.exchange.webservices.data.property.complex.FolderId;
import microsoft.exchange.webservices.data.search.FindItemsResults;
import microsoft.exchange.webservices.data.search.ItemView;
import microsoft.exchange.webservices.data.search.filter.SearchFilter;
import microsoft.exchange.webservices.data.search.filter.SearchFilter.SearchFilterCollection;

import org.jboss.logging.Logger;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;

import jwang.poc.model.OrdrDoc;

@Singleton
@LocalBean
@Startup
@TransactionManagement(value=TransactionManagementType.CONTAINER)
public class EmailProcessorSB {
 private static final String DOC_UPLOAD_EMAIL_SUBJECT_KEYWORD = "<some keywords>";

 @PersistenceContext(unitName = "pocDS", type = PersistenceContextType.EXTENDED)
    private EntityManager em;
 
 @Resource 
 private SessionContext sc;
 
 private Logger logger = Logger.getLogger(EmailProcessorSB.class);
 private ExchangeService service;
 private Thread invoker;
 
    @PostConstruct public void init() {
  service = new ExchangeService(ExchangeVersion.Exchange2010_SP2);
  
  // Provide Crendentials
  ExchangeCredentials credentials = new WebCredentials("<user_id>", "<user_pwd>", "<domain>");
  service.setCredentials(credentials);
  try {
   service.setUrl(new URI("https://exchange-server.company.com/EWS/exchange.asmx"));
  } catch (URISyntaxException e) {
   e.printStackTrace();
  }
     
  invoker = new Thread(new EmailProcessorInvoker(sc.getBusinessObject(EmailProcessorSB.class)));
  invoker.start();
  
  logger.debug("init done.");
    }
    
    @PreDestroy public void destroy() {
     invoker.interrupt();
    }
    
    @TransactionAttribute(value=REQUIRES_NEW)
    public void process() throws Exception {
  // reading emails, Get 10 items from mail box
  ItemView view = new ItemView(10);
  
  // sort emails by data (oldest first)
  view.getOrderBy().add(ItemSchema.DateTimeReceived, SortDirection.Ascending);
  
  // get unread emails and whose subject contains "key words"
  SearchFilterCollection sfc = new SearchFilter.SearchFilterCollection(LogicalOperator.And, 
    new SearchFilter.IsEqualTo(EmailMessageSchema.IsRead, false),
    new SearchFilter.ContainsSubstring(ItemSchema.Subject, DOC_UPLOAD_EMAIL_SUBJECT_KEYWORD)); 
  
  // Search Inbox
  FindItemsResults<Item> findResults = service.findItems(WellKnownFolderName.Inbox, sfc , view);
  
  if (findResults.getTotalCount() == 0) {
   return;
  }
  
  // also load attachments for findResults
  service.loadPropertiesForItems(findResults, new PropertySet(
   BasePropertySet.FirstClassProperties,
   EmailMessageSchema.Attachments));

  // iterate thru items
  for (Item item : findResults.getItems()) {
   // process each email
   logger.debug("email id=" + item.getId() + ", subject=" + item.getSubject());
   
   // extract po# and contentTags from subject 
   // subject: PO#12121313: CCI, SSO
   String subjectStr = item.getSubject().trim();
   int idx = subjectStr.indexOf(':');
   String poid = subjectStr.substring(DOC_UPLOAD_EMAIL_SUBJECT_KEYWORD.length(), idx);
   String contentTags = subjectStr.substring(idx + 1).trim();
      
   if (item.getHasAttachments()) {
    AttachmentCollection attachmentsCol = item.getAttachments();
    logger.debug("attachments count=" + attachmentsCol.getCount());
    
    for (int i = 0; i < attachmentsCol.getCount(); i++) {
              FileAttachment attachment=(FileAttachment)attachmentsCol.getPropertyAtIndex(i);
              String fileName = attachment.getName();
              
     idx = fileName.lastIndexOf('.');
     String fileExt = (idx == -1) ? "" : fileName.substring(idx);
     
     // create OrdrDoc first to get its auto-generated doc number 
     OrdrDoc doc = new OrdrDoc();
     doc.setUploadedAt(new Timestamp(System.currentTimeMillis()));
     doc.setOrdrNumb(new BigDecimal(poid));
     doc.setFilePath("override later");
     
     em.persist(doc);
     em.flush();    
     
     // constructs upload file path
        String rootPath = System.getProperty("jwang.poc.document.STORAGE_ROOT_PATH");
              
     // download attachments to local
     String filePath = rootPath + poid + "_" + doc.getDocNumb() + fileExt;                  
              attachment.load(filePath);
              
              logger.debug("saved attachment " + attachment.getName() + " to " + filePath);
              
     doc.setOrdrNumb(new BigDecimal(poid));
     doc.setDocName(fileName);
     doc.setDocContentTags(contentTags);
     doc.setFilePath(filePath);
     doc.setIsPodetail("N");
     doc.setUploadedAt(new Timestamp(System.currentTimeMillis()));
     
     logger.debug("persisted order doc id=" + doc.getDocNumb());
    }
   }
   
   // once it's processed, move email to other folder
   item.move(new FolderId("AQMkADAzNTUBN2I2LWY1MTgtNDFmMC1iMzY0LWQ0YzE0NzJiNTBlYwAuAAADsHwNM1WTOUiENfN67wR5GgEAsvx7FRTWH0uZkC5Z4WiWCgBMLq2AEwAAAA=="));   
   logger.debug("Done! document upload processing completed, request email moved to archiving folder 'Processed Request'.");
  }  
 }
    
    
    private class EmailProcessorInvoker extends ReceiverAdapter implements Runnable {
     private Logger logger = Logger.getLogger(EmailProcessorInvoker.class);
     
     private JChannel channel;
     private boolean isLeader;
     
     private EmailProcessorSB processor;
     
     public EmailProcessorInvoker(EmailProcessorSB processor) {
      this.isLeader = true;
      this.processor = processor;
     }
     
     @Override
     public void run() {
         try {
             channel=new JChannel(); // use the default config, udp.xml
             channel.setReceiver(this);
       channel.connect("EmailProcessorCluster");
          logger.debug("EmailProcessorInvoker started, emailProcessor=" + processor);
         
             while(true) {
        Thread.sleep(5000);
        if (isLeader) processor.process();
             }
      } catch (InterruptedException e) {
             channel.close();
       logger.debug("EmailProcessorInvoker stopped");
      } catch (Exception e) {
       e.printStackTrace();
       logger.fatal(e.toString());
      }
     }
     
        public void viewAccepted(View new_view) {
         logger.debug("cluster view: " + new_view);
            
            Address leader = channel.getView().getMembers().get(0);
            Address myself = channel.getAddress();
            isLeader = leader.equals(myself);
            
            logger.debug("leader=" + leader + ", myself=" + myself + ", isLeader? " + isLeader);
        }

        // not used
        public void receive(Message msg) {
         logger.debug(msg.getSrc() + ": " + msg.getObject());
        }

    }    
    
}

1 comment: