I want to send calculation results of my DataStream flow to other service over HTTP protocol. I see two possible ways how to implement it:
- Use synchronous Apache HttpClient client in sink
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";
    private CloseableHttpClient httpClient;
    private Histogram httpStatusesAccumulator;
    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpClients.custom()
            .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
            .build();
        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }
    @Override
    public void close() throws Exception {
        httpClient.close();
        httpStatusesAccumulator.resetLocal();
    }
    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));
        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);
        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);
        try(CloseableHttpResponse response = httpClient.execute(httpPost)) {
            int httpStatusCode = response.getStatusLine().getStatusCode();
            httpStatusesAccumulator.add(httpStatusCode);
        }
    }
}
- Use asynchronous Apache HttpAsyncClient client in sink
public class AsyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";
    private CloseableHttpAsyncClient httpClient;
    private Histogram httpStatusesAccumulator;
    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpAsyncClients.custom()
                .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
                .build();
        httpClient.start();
        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }
    @Override
    public void close() throws Exception {
        httpClient.close();
        httpStatusesAccumulator.resetLocal();
    }
    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));
        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);
        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);
        httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse response) {
                int httpStatusCode = response.getStatusLine().getStatusCode();
                httpStatusesAccumulator.add(httpStatusCode);
            }
            @Override
            public void failed(Exception ex) {
                httpStatusesAccumulator.add(-1); // -1 - failed
            }
            @Override
            public void cancelled() {
                httpStatusesAccumulator.add(-2); // -2 - cancelled
            }
        });
    }
}
Questions:
- Should I use sync or async HTTP client in sink? 
- In case if I will use sync client it will block sink and through back pressure Flink will block source. Right? 
- In case if I will use async client it won't block sink. Right? 
- Accumulators is not thread safe? i.e. can I use it in async callback? 
- RuntimeContext is not thread safe? i.e. can I use it in async callback? 
 
     
     
    