代码示例 (Java)

初始化:

String endpoint = "http://staging.emq.api.xiaomi.com";
// 设置ak sk
Credential credential = new Credential().setSecretKeyId(secretKeyId).
    setSecretKey(secretKey).setType(UserType.APP_SECRET);
EMQClientFactory clientFactory = new EMQClientFactory(credential,
    generateHttpClient(10, 10));

// properties配置
Properties properties = new Properties();
properties.setProperty("galaxy.emq.service.endpoint", endpoint); // endpoint
properties.setProperty("galaxy.emq.client.timeout", String.valueOf(30000)); // 可选 socket 超时时间
properties.setProperty("galaxy.emq.client.conn.timeout", String.valueOf(60000)); // 可选 客户端超时时间
properties.setProperty("galaxy.emq.client.auto.retry", String.valueOf(true));  // 可选 是否重试
properties.setProperty("galaxy.emq.client.retry.number", String.valueOf(3));  // 可选 重试次数
EMQClientConfig config = new EMQClientConfig(properties);

QueueService.Iface queueClient = clientFactory.newQueueClient(config);
MessageService.Iface messageClient = clientFactory.newMessageClient(config);

创建队列:

  CreateQueueRequest createQueueRequest = new CreateQueueRequest(name);
  CreateQueueResponse createQueueResponse = queueClient.createQueue(
      createQueueRequest);
  // 之后的操作都需要使用这里的queueName,而不是第一行中的name
  String queueName = createQueueResponse.getQueueName();

发送者:

  while(true) {
    SendMessageRequest sendMessageRequest =
        new SendMessageRequest(queueName, messageBody);
    SendMessageResponse sendMessageResponse =
        messageClient.sendMessage(sendMessageRequest);
  }

接收者:

  while(true) {
    ReceiveMessageRequest receiveMessageRequest =
        new ReceiveMessageRequest(queueName);
    List<ReceiveMessageResponse> receiveMessageResponse =
        messageClient.receiveMessage(receiveMessageRequest);

    if (!receiveMessageResponse.isEmpty()) {

      // process receiveMessageResponse

      DeleteMessageBatchRequest deleteMessageBatchRequest =
          new DeleteMessageBatchRequest();
      deleteMessageBatchRequest.setQueueName(queueName);
      for (ReceiveMessageResponse response : receiveMessageResponse) {
        deleteMessageBatchRequest.addToDeleteMessageBatchRequestEntryList(
            new DeleteMessageBatchRequestEntry(response.getReceiptHandle()));
      }
      messageClient.deleteMessageBatch(deleteMessageBatchRequest);
    }
  }

删除队列:

  DeleteQueueRequest deleteQueueRequest = new DeleteQueueRequest(queueName);
  queueClient.deleteQueue(deleteQueueRequest);

更多更详细的示例请参考各语言SDK下的Example, 或直接参考:SDK Example

results matching ""

    No results matching ""