I have a DSL (shown below) that ends on "log", so the json produced from the jdbc source should be logged and it's not.
The Supplier reads a database queue and produce a json array for the rows.
If I turn on logging, the                             SybaseSupplierConfiguration.this.logger.debug("Json: {}", json); is outputted.
Why is it not flowing to "log" ?
So far I have tried:
- Downgrade spring boot to 2.2.9 (using 2.3.2)
- Fixed the return result of jsonSupplier (to a json string)
- Disabled prometheus / grafana
- Explicitly configured poll spring.cloud.stream.poller.fixed-delay=10
- Used rabbitmq binder and docker image
- Offered some booz to the spring cloud dataflow god.
None worked.
the docker:
export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
docker-compose -f ./docker-compose.yml -f ./docker-compose-prometheus.yml up -d
the pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>company-cloud-dataflow-apps</artifactId>
        <groupId>br.com.company.cloud.dataflow.apps</groupId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>jdbc-sybase-supplier</artifactId>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>app-starters-micrometer-common</artifactId>
            <version>${app-starters-micrometer-common.version}</version>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer.prometheus</groupId>
            <artifactId>prometheus-rsocket-spring</artifactId>
            <version>${prometheus-rsocket.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>net.sourceforge.jtds</groupId>
            <artifactId>jtds</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jdk8</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-parameter-names</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-jaxb-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
The configuration:
....    
spring.cloud.stream.function.bindings.jsonSupplier-out-0=output
spring.cloud.function.definition=jsonSupplier
The implementation:
@SpringBootApplication
@EnableConfigurationProperties(SybaseSupplierProperties.class)
public class SybaseSupplierConfiguration {
    private final DataSource dataSource;
    private final SybaseSupplierProperties properties;
    private final ObjectMapper objectMapper;
    private final JdbcTemplate jdbcTemplate;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    public SybaseSupplierConfiguration(DataSource dataSource,
                                       JdbcTemplate jdbcTemplate,
                                       SybaseSupplierProperties properties) {
        this.dataSource = dataSource;
        this.jdbcTemplate = jdbcTemplate;
        this.properties = properties;
        objectMapper = new ObjectMapper().registerModule(new ParameterNamesModule())
                .registerModule(new Jdk8Module())
                .registerModule(new JavaTimeModule())
                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    }
    public static void main(String[] args) {
        SpringApplication.run(SybaseSupplierConfiguration.class, args);
    }
    @Value
    static class IntControle {
        long cdIntControle;
        String deTabela;
    }
    @Bean
    public MessageSource<Object> jdbcMessageSource() {
        String query = "select cd_int_controle, de_tabela from int_controle rowlock readpast " +
                    "where id_status = 0 order by cd_int_controle";
        JdbcPollingChannelAdapter adapter =
                new JdbcPollingChannelAdapter(dataSource, query) {
                    @Override
                    protected Object doReceive() {
                        Object object = super.doReceive();
                        if (object == null) {
                            return null;
                        }
                        @SuppressWarnings("unchecked")
                        List<IntControle> ints = (List<IntControle>) object;
                        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
                            try (JsonGenerator jen = objectMapper.createGenerator(out)) {
                                jen.writeStartArray();
                                for (IntControle itm : ints) {
                                    String qry = String.format("select * from vw_integ_%s where cd_int_controle = %d",
                                            itm.getDeTabela(), itm.getCdIntControle());
                                    List<Map<String, Object>> maps = jdbcTemplate.queryForList(qry);
                                    for (Map<String, Object> l : maps) {
                                        jen.writeStartObject();
                                        for (Map.Entry<String, Object> entry : l.entrySet()) {
                                            String k = entry.getKey();
                                            Object v = entry.getValue();
                                            jen.writeFieldName(k);
                                            if (v == null) {
                                                jen.writeNull();
                                            } else {
                                                //caso necessário um item específico, ver em:
                                                // https://stackoverflow.com/questions/6514876/most-efficient-conversion-of-resultset-to-json
                                                jen.writeObject(v);
                                            }
                                        }
                                        jen.writeEndObject();
                                    }
                                }
                                jen.writeEndArray();
                            }
                            String json = out.toString();
                            SybaseSupplierConfiguration.this.logger.debug("Json: {}", json);
                            return json;
                        } catch (IOException e) {
                            throw new IllegalArgumentException("Erro ao converter json", e);
                        }
                    }
                };
        adapter.setMaxRows(properties.getPollSize());
        adapter.setUpdatePerRow(true);
        adapter.setRowMapper((RowMapper<IntControle>) (rs, i) -> new IntControle(rs.getLong(1), rs.getString(2)));
        adapter.setUpdateSql("update int_controle set id_status = 1 where cd_int_controle = :cdIntControle");
        return adapter;
    }
    @Bean
    public Supplier<Message<?>> jsonSupplier() {
        return jdbcMessageSource()::receive;
    }
}
the shell setup:
app register --name jdbc-postgresql-sink --type sink --uri maven://br.com.company.cloud.dataflow.apps:jdbc-postgresql-sink:1.0.0-SNAPSHOT --force
app register --name jdbc-sybase-supplier --type source --uri maven://br.com.company.cloud.dataflow.apps:jdbc-sybase-supplier:1.0.0-SNAPSHOT --force
stream create --name sybase_to_pgsql --definition "jdbc-sybase-supplier | log "
stream deploy --name sybase_to_pgsql
the log:
....
2020-08-02 00:40:18.644  INFO 81 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 0 endpoint(s) beneath base path '/actuator'
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {router} as a subscriber to the 'jsonSupplier_integrationflow.channel#0' channel
2020-08-02 00:40:18.793  INFO 81 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.jsonSupplier_integrationflow.channel#0' has 1 subscriber(s).
2020-08-02 00:40:18.794  INFO 81 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'jsonSupplier_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-02 00:40:18.795  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
2020-08-02 00:40:19.235  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
2020-08-02 00:40:19.235  INFO 81 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
2020-08-02 00:40:19.362  INFO 81 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: sybase_to_pgsql.jdbc-sybase-supplier
2020-08-02 00:40:19.364  INFO 81 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:19.572  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328819571
2020-08-02 00:40:20.403  INFO 81 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:20.477  INFO 81 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328820477
2020-08-02 00:40:20.573  INFO 81 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: um9lJtXTQUmURh9cwOkqxA
2020-08-02 00:40:20.574  INFO 81 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2020-08-02 00:40:20.622  INFO 81 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.output' has 1 subscriber(s).
2020-08-02 00:40:20.625  INFO 81 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started bean 'jsonSupplier_integrationflow.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
2020-08-02 00:40:20.654  INFO 81 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 20031 (http) with context path ''
2020-08-02 00:40:20.674  INFO 81 --- [           main] b.c.c.d.a.s.SybaseSupplierConfiguration  : Started SybaseSupplierConfiguration in 12.982 seconds (JVM running for 14.55)
2020-08-02 00:40:21.160  INFO 81 --- [ask-scheduler-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-2
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:21.189  INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1596328821189
2020-08-02 00:40:21.271  INFO 81 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: um9lJtXTQUmURh9cwOkqxA
 
    