Redis不释放导致的内存占满

我们有一部分上古时代的 flink 任务是用传统的 DataStream API 写的,最近有tx 去修改的时候,使用 JedisPool 时,没有显示 close,导致在 Redis 连接不释放,代码如下:

DataStream<VehcileLatestState> dataStream =
          tableEnv
              .toDataStream(source_table)
              .flatMap(
                  new RichFlatMapFunction<Row, VehcileLatestState>() {
                      ObjectMapper objectMapper = new ObjectMapper();
                      JedisPool jedisPool = null;

                      @Override
                      public void open(Configuration parameters) throws Exception {
                          jedisPool =
                              new JedisPool(
                                  new GenericObjectPoolConfig<Jedis>(),
                                  "r-xx.redis.rds.xxx.com",
                                  6379,
                                  2000,
                                  "zzzz==");
                      }

                      @Override
                      public void flatMap(Row row, Collector<VehcileLatestState> collector)
                          throws Exception {
                          long partitionId = 0;
                          long kafkaOffset = 0;
                          String kafkaTimestamp = "1970-01-01 00:00:00";
                          xxxx
                          heartbeat.setHeartbeat_v2(1);
                          try (Jedis jedis = jedisPool.getResource()) {
                              String lastTimeKey = String.format("%s_%s", heartbeat.getVehicle_vin(), Constants.VEHICLE_LATEST_TIME_KEY_TAIL);
                              String latestTimestamp = jedis.get(lastTimeKey);
                              if (null != latestTimestamp
                                  && Long.parseLong(latestTimestamp) >= heartbeat.getCurrent_timestamp_seconds()) {
                                  heartbeat.setOut_order(1);
                              } else {
                                  heartbeat.setOut_order(0);
                                  if (!heartbeat.isDirty() || (null == heartbeat.getDirty_reason() || !heartbeat.getDirty_reason().equals(Constants.PARSE_EXCEPTION))) {
                                      String key = String.format("%s_%s", heartbeat.getVehicle_vin(), Constants.VEHICLE_LATEST_STATE_KEY_TAIL);
                                      jedis.setex(key, 7 * 24 * 60 * 60, objectMapper.writeValueAsString(heartbeat));
                                  }
                                  jedis.setex(lastTimeKey, 48 * 60 * 60, heartbeat.getCurrent_timestamp_seconds() + "");
                              }
                          }
                          collector.collect(heartbeat);
                      }
                  });

代码中 Jedis 是释放了,但是 JedisPool 没有释放,导致redis 连接不释放。