Tuesday, May 12, 2015

Microsoft Exchange Server EWS Java API push notification working example

Microsoft Exchange Server EWS Java API push notification working example



First, connect to exchange server via EWS Java API as

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ExchangeService service; 
private void init() throws Exception {
 service = new ExchangeService(ExchangeVersion.Exchange2010_SP2);
 
 // Provide Crendentials
 ExchangeCredentials credentials = new WebCredentials("my_login_id",
   "nypassword", "my_login_domain");
 service.setCredentials(credentials);

 service.setUrl(new URI("https://somepoint.company.com/EWS/exchange.asmx"));
}

then, talk to exchange server to subscribe to push notification as
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
private void subscribeToPush() throws Exception {
 WellKnownFolderName wkFolder = WellKnownFolderName.Inbox;
 FolderId folderId = new FolderId(wkFolder);
 List<FolderId> folder = new ArrayList<FolderId>();
 folder.add(folderId);
 
 URI callback = new URI("http://my_notification_listener_host:7777/podm/rs/emailnotification");
 
 PushSubscription pushSubscription = service.subscribeToPushNotifications(
   folder,
   callback /* The endpoint of the listener. */,
      5 /* Get a status event every 5 minutes if no new events are available. */,
      null  /* watermark: null to start a new subscription. */,
      EventType.NewMail);  
 System.out.println("PushSubscription = " + pushSubscription);
 
 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
 System.out.print("Enter String");
 String s = br.readLine();  
}

The push notification listener is implemented as Restful Web Service in Java as
1
2
3
4
5
6
7
8
9
@POST()
@Path("/emailnotification")
@Produces(MediaType.TEXT_XML)
public Response onNotificationReceived() throws Exception {
 logger.debug("received EWS notification");
 File file = new File("C:\\Users\\XYZ\\workspace\\POC\\ews_notification_response.xml");
 String responseXMLStr = IOUtils.toString(new FileInputStream(file));
 return Response.ok(responseXMLStr).build();
}

Once deployed to JBoss AS 7.1, below are the traffic back end force between notification listener and exchange server as
on new email arrived in INBOX, exchange server notify our registered notification listener via SOAP message as

notification SOAP message header:
POST /podm/rs/emailnotification HTTP/1.1
Content-Type: text/xml; charset=utf-8
Accept: text/xml
SOAPAction: http://schemas.microsoft.com/exchange/services/2006/messages/SendNotification
Host: my_notification_listener_host:8080
Content-Length: 1492
Connection: Close

notification SOAP message body:

<?xml version="1.0" encoding="UTF-8"?>
<soap11:Envelope xmlns:soap11="http://schemas.xmlsoap.org/soap/envelope/">
   <soap11:Header>
      <t:RequestServerVersion xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types" xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" Version="Exchange2010_SP2" />
   </soap11:Header>
   <soap11:Body>
      <m:SendNotification xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types">
         <m:ResponseMessages>
            <m:SendNotificationResponseMessage ResponseClass="Success">
               <m:ResponseCode>NoError</m:ResponseCode>
               <m:Notification>
                  <t:SubscriptionId>EABnaXphLWZlLm93ZmcuY29tEAAAAA4cC3zQY3VAn3vY0tY6uAZ8uvK7TlLSCA==</t:SubscriptionId>
                  <t:PreviousWatermark>AQAAAB0YmFETCChLogUFZRl0jsftgtkCAAAAAAA=</t:PreviousWatermark>
                  <t:MoreEvents>false</t:MoreEvents>
                  <t:NewMailEvent>
                     <t:Watermark>AQAAAB0YmFETCChLogUFZRl0jscHg9kCAAAAAAE=</t:Watermark>
                     <t:TimeStamp>2015-05-01T17:52:40Z</t:TimeStamp>
                     <t:ItemId Id="AAMkADAzNTU1N2I2LWY1MTgtNDFmMC1iMzY0LWQ0YzE0NzJiNTBlYwBGAAAAAACwfA0zVZM5SIQ183rvBHkaBwAgnojIUDF8R5DPCbpjOk1jAAAATOY4AAAgnojIUDF8R5DPCbpjOk1jAAAATidsAAA=" ChangeKey="CQAAAA==" />
                     <t:ParentFolderId Id="AQMkADAzNTUBN2I2LWY1MTgtNDFmMC1iMzY0LWQ0YzE0NzJiNTBlYwAuAAADsHwNM1WTOUiENfN67wR5GgEAIJ6IyFAxfEeQzwm6YzpNYwAAAUzmOAAAAA==" ChangeKey="AQAAAA==" />
                  </t:NewMailEvent>
               </m:Notification>
            </m:SendNotificationResponseMessage>
         </m:ResponseMessages>
      </m:SendNotification>
   </soap11:Body>
</soap11:Envelope>

Once our notification listener receives a notification from exchange server, it sends back a ACK response to exchange server and also let exchange server know it's still alive:
notification response message header:
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Content-Type: text/xml
Content-Length: 474
Date: Fri, 01 May 2015 17:52:41 GMT
Connection: close 
notification response message body:
<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
   <soap:Body>
      <m:SendNotificationResult>
         <m:SubscriptionStatus>OK</m:SubscriptionStatus>
      </m:SendNotificationResult>
   </soap:Body>
</soap:Envelope>


More traffic output:


14:38:29,151 DEBUG [jwang.poc.ejbs.EmailProcessorSB] (ServerService Thread Pool -- 64) subscription id=EABnaXphLWZlLm93ZmcuY29tEAAAAGcHwXOH1UFIuzwCceWIzEw2k58eE1vSCA==
14:38:29,152 INFO  [jwang.poc.ejbs.EmailProcessorSB] (ServerService Thread Pool -- 64) resumed subscription to push notification on exchange server with previous watermark = AQAAAB0YmFETCChLogUFZRl0jscNWfACAAAAAAA=


*** notification with events
<?xml version="1.0" encoding="UTF-8"?>
<soap11:Envelope xmlns:soap11="http://schemas.xmlsoap.org/soap/envelope/">
   <soap11:Header>
      <t:RequestServerVersion xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types" xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" Version="Exchange2010_SP2" />
   </soap11:Header>
   <soap11:Body>
      <m:SendNotification xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types">
         <m:ResponseMessages>
            <m:SendNotificationResponseMessage ResponseClass="Success">
               <m:ResponseCode>NoError</m:ResponseCode>
               <m:Notification>
                  <t:SubscriptionId>EABnaXphLWZlLm93ZmcuY29tEAAAAGcHwXOH1UFIuzwCceWIzEw2k58eE1vSCA==</t:SubscriptionId>
                  <t:PreviousWatermark>AQAAAB0YmFETCChLogUFZRl0jscE+vACAAAAAAE=</t:PreviousWatermark>
                  <t:MoreEvents>false</t:MoreEvents>
                  <t:NewMailEvent>
                     <t:Watermark>AQAAAB0YmFETCChLogUFZRl0jsd/BPECAAAAAAE=</t:Watermark>
                     <t:TimeStamp>2015-05-12T21:39:07Z</t:TimeStamp>
                     <t:ItemId Id="AAMkADAzNTU1N2I2LWY1MTgtNDFmMC1iMzY0LWQ0YzE0NzJiNTBlYwBGAAAAAACwfA0zVZM5SIQ183rvBHkaBwAgnojIUDF8R5DPCbpjOk1jAAAATOY4AAAgnojIUDF8R5DPCbpjOk1jAAAATigPAAA=" ChangeKey="CQAAAA==" />
                     <t:ParentFolderId Id="AQMkADAzNTUBN2I2LWY1MTgtNDFmMC1iMzY0LWQ0YzE0NzJiNTBlYwAuAAADsHwNM1WTOUiENfN67wR5GgEAIJ6IyFAxfEeQzwm6YzpNYwAAAUzmOAAAAA==" ChangeKey="AQAAAA==" />
                  </t:NewMailEvent>
               </m:Notification>
            </m:SendNotificationResponseMessage>
         </m:ResponseMessages>
      </m:SendNotification>
   </soap11:Body>
</soap11:Envelope>


** notification status ping from server without events
<?xml version="1.0" encoding="UTF-8"?>
<soap11:Envelope xmlns:soap11="http://schemas.xmlsoap.org/soap/envelope/">
   <soap11:Header>
      <t:RequestServerVersion xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types" xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" Version="Exchange2010_SP2" />
   </soap11:Header>
   <soap11:Body>
      <m:SendNotification xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types">
         <m:ResponseMessages>
            <m:SendNotificationResponseMessage ResponseClass="Success">
               <m:ResponseCode>NoError</m:ResponseCode>
               <m:Notification>
                  <t:SubscriptionId>EABnaXphLWZlLm93ZmcuY29tEAAAAGcHwXOH1UFIuzwCceWIzEw2k58eE1vSCA==</t:SubscriptionId>
                  <t:PreviousWatermark>AQAAAB0YmFETCChLogUFZRl0jsd/BPECAAAAAAE=</t:PreviousWatermark>
                  <t:MoreEvents>false</t:MoreEvents>
                  <t:StatusEvent>
                     <t:Watermark>AQAAAB0YmFETCChLogUFZRl0jscbCPECAAAAAAE=</t:Watermark>
                  </t:StatusEvent>
               </m:Notification>
            </m:SendNotificationResponseMessage>
         </m:ResponseMessages>
      </m:SendNotification>
   </soap11:Body>
</soap11:Envelope>

By observation, exchange server seems generate new subscription id for every new subscription to push notification request (even with valid previous watermark parameter). After JBoss AS restars and new email arrives in inbox, exchange server will send notification event to all subscriptions (old and new). When we received a notification, we can check if its notification id matches our current active notification id, if not, we can response with "unscribe" to the exchange server to unscribe the stale subscription.

Unsubscribe response as:

<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
   <soap:Body>
      <m:SendNotificationResult>
         <m:SubscriptionStatus>Unsubscribe</m:SubscriptionStatus>
      </m:SendNotificationResult>
   </soap:Body>
</soap:Envelope>

as for the notification of current active notification id, we can response with OK to tell the exchange server we got the message and we're still alive:
OK response as:

<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:m="http://schemas.microsoft.com/exchange/services/2006/messages" xmlns:t="http://schemas.microsoft.com/exchange/services/2006/types" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
   <soap:Body>
      <m:SendNotificationResult>
         <m:SubscriptionStatus>OK</m:SubscriptionStatus>
      </m:SendNotificationResult>
   </soap:Body>
</soap:Envelope>

Customized HA-ready JavaEE timer to invoke EJB methods with Tx support

In my application, I need an non-persistent EJB timer to trigger my EJB methods as scheduled.

JBOSS AS 7.1, however, seems to have an issue/bug in supporting non-persistent ejb timers: JBOSS always treat timer as persistent, even though the timer's been configured as non-persistent. This undesired behaviour causes JBOSS to invoke timer seemingly-endlessly right after the server got restarted.

To overcome this, I implement my own timer to trigger EJB methods.
Multiple JBOSS AS, each hosts

  • Timer: EmailProcessorInvoker
  • Singleton EJBEmailProcessorSB


In this setup, 
  • ejb hosts core business logic (processing incoming emails from exchange server)
  • ejb email processing logic is triggered by the timer
  • timer works in its own thread (started by ejb:postconstruct )
  • timer uses JGroups to form a cluster of timers and only cluster leader works as active timer to trigger its local EJB's email processing logic
  • timer MUST hold EJB's business object to be able to use JavaEE container's Tx support (line#82)
Sample Java:


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package jwang.poc.ejbs;

import static javax.ejb.TransactionAttributeType.REQUIRES_NEW;

import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Timestamp;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.SessionContext;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType;

import microsoft.exchange.webservices.data.core.ExchangeService;
import microsoft.exchange.webservices.data.core.PropertySet;
import microsoft.exchange.webservices.data.core.service.item.Item;
import microsoft.exchange.webservices.data.core.service.schema.EmailMessageSchema;
import microsoft.exchange.webservices.data.core.service.schema.ItemSchema;
import microsoft.exchange.webservices.data.credential.ExchangeCredentials;
import microsoft.exchange.webservices.data.credential.WebCredentials;
import microsoft.exchange.webservices.data.enumeration.BasePropertySet;
import microsoft.exchange.webservices.data.enumeration.ExchangeVersion;
import microsoft.exchange.webservices.data.enumeration.LogicalOperator;
import microsoft.exchange.webservices.data.enumeration.SortDirection;
import microsoft.exchange.webservices.data.enumeration.WellKnownFolderName;
import microsoft.exchange.webservices.data.property.complex.AttachmentCollection;
import microsoft.exchange.webservices.data.property.complex.FileAttachment;
import microsoft.exchange.webservices.data.property.complex.FolderId;
import microsoft.exchange.webservices.data.search.FindItemsResults;
import microsoft.exchange.webservices.data.search.ItemView;
import microsoft.exchange.webservices.data.search.filter.SearchFilter;
import microsoft.exchange.webservices.data.search.filter.SearchFilter.SearchFilterCollection;

import org.jboss.logging.Logger;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;

import jwang.poc.model.OrdrDoc;

@Singleton
@LocalBean
@Startup
@TransactionManagement(value=TransactionManagementType.CONTAINER)
public class EmailProcessorSB {
 private static final String DOC_UPLOAD_EMAIL_SUBJECT_KEYWORD = "<some keywords>";

 @PersistenceContext(unitName = "pocDS", type = PersistenceContextType.EXTENDED)
    private EntityManager em;
 
 @Resource 
 private SessionContext sc;
 
 private Logger logger = Logger.getLogger(EmailProcessorSB.class);
 private ExchangeService service;
 private Thread invoker;
 
    @PostConstruct public void init() {
  service = new ExchangeService(ExchangeVersion.Exchange2010_SP2);
  
  // Provide Crendentials
  ExchangeCredentials credentials = new WebCredentials("<user_id>", "<user_pwd>", "<domain>");
  service.setCredentials(credentials);
  try {
   service.setUrl(new URI("https://exchange-server.company.com/EWS/exchange.asmx"));
  } catch (URISyntaxException e) {
   e.printStackTrace();
  }
     
  invoker = new Thread(new EmailProcessorInvoker(sc.getBusinessObject(EmailProcessorSB.class)));
  invoker.start();
  
  logger.debug("init done.");
    }
    
    @PreDestroy public void destroy() {
     invoker.interrupt();
    }
    
    @TransactionAttribute(value=REQUIRES_NEW)
    public void process() throws Exception {
  // reading emails, Get 10 items from mail box
  ItemView view = new ItemView(10);
  
  // sort emails by data (oldest first)
  view.getOrderBy().add(ItemSchema.DateTimeReceived, SortDirection.Ascending);
  
  // get unread emails and whose subject contains "key words"
  SearchFilterCollection sfc = new SearchFilter.SearchFilterCollection(LogicalOperator.And, 
    new SearchFilter.IsEqualTo(EmailMessageSchema.IsRead, false),
    new SearchFilter.ContainsSubstring(ItemSchema.Subject, DOC_UPLOAD_EMAIL_SUBJECT_KEYWORD)); 
  
  // Search Inbox
  FindItemsResults<Item> findResults = service.findItems(WellKnownFolderName.Inbox, sfc , view);
  
  if (findResults.getTotalCount() == 0) {
   return;
  }
  
  // also load attachments for findResults
  service.loadPropertiesForItems(findResults, new PropertySet(
   BasePropertySet.FirstClassProperties,
   EmailMessageSchema.Attachments));

  // iterate thru items
  for (Item item : findResults.getItems()) {
   // process each email
   logger.debug("email id=" + item.getId() + ", subject=" + item.getSubject());
   
   // extract po# and contentTags from subject 
   // subject: PO#12121313: CCI, SSO
   String subjectStr = item.getSubject().trim();
   int idx = subjectStr.indexOf(':');
   String poid = subjectStr.substring(DOC_UPLOAD_EMAIL_SUBJECT_KEYWORD.length(), idx);
   String contentTags = subjectStr.substring(idx + 1).trim();
      
   if (item.getHasAttachments()) {
    AttachmentCollection attachmentsCol = item.getAttachments();
    logger.debug("attachments count=" + attachmentsCol.getCount());
    
    for (int i = 0; i < attachmentsCol.getCount(); i++) {
              FileAttachment attachment=(FileAttachment)attachmentsCol.getPropertyAtIndex(i);
              String fileName = attachment.getName();
              
     idx = fileName.lastIndexOf('.');
     String fileExt = (idx == -1) ? "" : fileName.substring(idx);
     
     // create OrdrDoc first to get its auto-generated doc number 
     OrdrDoc doc = new OrdrDoc();
     doc.setUploadedAt(new Timestamp(System.currentTimeMillis()));
     doc.setOrdrNumb(new BigDecimal(poid));
     doc.setFilePath("override later");
     
     em.persist(doc);
     em.flush();    
     
     // constructs upload file path
        String rootPath = System.getProperty("jwang.poc.document.STORAGE_ROOT_PATH");
              
     // download attachments to local
     String filePath = rootPath + poid + "_" + doc.getDocNumb() + fileExt;                  
              attachment.load(filePath);
              
              logger.debug("saved attachment " + attachment.getName() + " to " + filePath);
              
     doc.setOrdrNumb(new BigDecimal(poid));
     doc.setDocName(fileName);
     doc.setDocContentTags(contentTags);
     doc.setFilePath(filePath);
     doc.setIsPodetail("N");
     doc.setUploadedAt(new Timestamp(System.currentTimeMillis()));
     
     logger.debug("persisted order doc id=" + doc.getDocNumb());
    }
   }
   
   // once it's processed, move email to other folder
   item.move(new FolderId("AQMkADAzNTUBN2I2LWY1MTgtNDFmMC1iMzY0LWQ0YzE0NzJiNTBlYwAuAAADsHwNM1WTOUiENfN67wR5GgEAsvx7FRTWH0uZkC5Z4WiWCgBMLq2AEwAAAA=="));   
   logger.debug("Done! document upload processing completed, request email moved to archiving folder 'Processed Request'.");
  }  
 }
    
    
    private class EmailProcessorInvoker extends ReceiverAdapter implements Runnable {
     private Logger logger = Logger.getLogger(EmailProcessorInvoker.class);
     
     private JChannel channel;
     private boolean isLeader;
     
     private EmailProcessorSB processor;
     
     public EmailProcessorInvoker(EmailProcessorSB processor) {
      this.isLeader = true;
      this.processor = processor;
     }
     
     @Override
     public void run() {
         try {
             channel=new JChannel(); // use the default config, udp.xml
             channel.setReceiver(this);
       channel.connect("EmailProcessorCluster");
          logger.debug("EmailProcessorInvoker started, emailProcessor=" + processor);
         
             while(true) {
        Thread.sleep(5000);
        if (isLeader) processor.process();
             }
      } catch (InterruptedException e) {
             channel.close();
       logger.debug("EmailProcessorInvoker stopped");
      } catch (Exception e) {
       e.printStackTrace();
       logger.fatal(e.toString());
      }
     }
     
        public void viewAccepted(View new_view) {
         logger.debug("cluster view: " + new_view);
            
            Address leader = channel.getView().getMembers().get(0);
            Address myself = channel.getAddress();
            isLeader = leader.equals(myself);
            
            logger.debug("leader=" + leader + ", myself=" + myself + ", isLeader? " + isLeader);
        }

        // not used
        public void receive(Message msg) {
         logger.debug(msg.getSrc() + ": " + msg.getObject());
        }

    }    
    
}