使用Kinesis Video Stream接入视频类设备业务

目前有需求,当设备被某个事件触发时,设备会上传事件先后时间的视频到Kinesis Video Stream(后面简称kvs)上,在kvs上需要按时间触发的前后10秒对视频进行截取,并且拿到事件发生的截图作为视频的封面,最后将视频和截图一起上传到S3

  1. 接收设备事件,通过aws iot的消息路由到指定Lambda

  2. 在Lambda中,通过消息的事件戳确定视频起始和结束时间,然后使用GetClip api来获取到指定时间内的视频

  3. 在Lambda中,使用ffmpeg-platform获取到事件触发时的截图,最后将视和截图频上传到S3

kvs设备事件触发流程图

上面步骤需要在kvs提前创建好流,并且设备已经将视频上传

具体实现

导入jar包

software.amazon.awssdk kinesisvideoarchivedmedia 2.20.48 software.amazon.awssdk kinesisvideo 2.20.48 org.bytedeco javacv 1.5.6 org.bytedeco ffmpeg-platform 4.4-1.5.6

代码实现


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.kinesisvideo.KinesisVideoClient;
import software.amazon.awssdk.services.kinesisvideo.model.APIName;
import software.amazon.awssdk.services.kinesisvideo.model.GetDataEndpointRequest;
import software.amazon.awssdk.services.kinesisvideo.model.GetDataEndpointResponse;
import software.amazon.awssdk.services.kinesisvideoarchivedmedia.KinesisVideoArchivedMediaAsyncClient;
import software.amazon.awssdk.services.kinesisvideoarchivedmedia.model.*;


import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.time.Instant;

import java.util.concurrent.CompletableFuture;


/**

  • @author: caoyu
  • @CreateTime: 2023-05-20
  • @Description: TODO
    /
    public class AwsKvsUtils {

    private static final Logger logger = LogManager.getLogger(AwsKvsUtils.class);
    private static KinesisVideoArchivedMediaAsyncClient kvsClient;

    private static S3Utils s3Utils;


    public static AwsKvsUtils build(AwsCredentialsProvider awsCredentialsProvider, URI endPoint, Region region) {
    KinesisVideoArchivedMediaAsyncClient client = KinesisVideoArchivedMediaAsyncClient
    .builder()
    .credentialsProvider(awsCredentialsProvider)
    .region(region)
    .endpointOverride(endPoint)
    .build();
    S3Utils s3Utils = S3Utils.build(awsCredentialsProvider, region);
    return new AwsKvsUtils(client,s3Utils);
    }

    public void getClip(String streamName, Long time, String bucketName){
    //从kvs获取视频片段
    ClipFragmentSelector clipFragmentSelector = ClipFragmentSelector
    .builder()
    .fragmentSelectorType(ClipFragmentSelectorType.SERVER_TIMESTAMP)
    .timestampRange(ClipTimestampRange.builder().startTimestamp(Instant.ofEpochMilli(time-10
    1000)).endTimestamp(Instant.ofEpochMilli(time+10*1000)).build()).build();
    GetClipRequest request = GetClipRequest.builder().streamName(streamName).clipFragmentSelector(clipFragmentSelector).build();
    File tempFile;
    Path path = null;
    try {
    tempFile = File.createTempFile(“kvs-”, “.mkv”);
    path = tempFile.toPath();
    } catch (IOException e) {
    logger.error(“创建临时文件失败–>{}”, e.getMessage());
    }
    if (path == null){
    logger.error(“获取临时文件目录失败”);
    return;
    }
    CompletableFuture result = kvsClient.getClip(request, path);
    GetClipResponse getClipResponse;
    try {
    getClipResponse = result.get();
    } catch (Exception e) {
    logger.error(“GetClip error”, e);
    return;
    }
    if (!getClipResponse.sdkHttpResponse().isSuccessful()){
    logger.error(“GetClip error, response code: {}, requestId:{}”, getClipResponse.sdkHttpResponse().statusCode(),getClipResponse.responseMetadata().requestId());
    }
    //上传视频到s3
    s3Utils.uploadObject(bucketName, “test.mkv”, path.toString());
    //提取视频帧
    InputStream inputStream = ExtractFrame.extractFrame(path.toString());
    //上传视频帧到s3
    s3Utils.uploadObject(bucketName, “test.png”, inputStream);
    }

    private AwsKvsUtils() {
    }

    private AwsKvsUtils(KinesisVideoArchivedMediaAsyncClient kvsClient, S3Utils s3Utils) {
    AwsKvsUtils.kvsClient = kvsClient;
    AwsKvsUtils.s3Utils = s3Utils;
    }




    public static void setKvsClient(KinesisVideoArchivedMediaAsyncClient kvsClient) {
    AwsKvsUtils.kvsClient = kvsClient;
    }

    public static S3Utils getS3Utils() {
    return s3Utils;
    }

    public static void setS3Utils(S3Utils s3Utils) {
    AwsKvsUtils.s3Utils = s3Utils;
    }

    public static void main(String[] args) throws URISyntaxException {
    //获取到凭证对象
    String streamName = “xxx”;
    String accessKey = “xxxx”;
    String secretKey = “xxxxx”;
    StaticCredentialsProvider staticCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
    //获取kvs访问端点
    KinesisVideoClient kinesisVideoClient = KinesisVideoClient.builder().credentialsProvider(staticCredentialsProvider).region(Region.US_EAST_1).build();
    GetDataEndpointResponse dataEndpoint = kinesisVideoClient.getDataEndpoint(GetDataEndpointRequest.builder().apiName(APIName.GET_CLIP).streamName(streamName).build());
    //构建kvs工具类
    AwsKvsUtils awsKvsUtils = AwsKvsUtils.build(staticCredentialsProvider, new URI(dataEndpoint.dataEndpoint()), Region.US_EAST_1);
    //1684569276774L是事件发生时间戳,kvs-video-storage是s3上的桶名
    awsKvsUtils.getClip(streamName, 1684569276774L, “kvs-video-storage”);
    }
    }

维护好设备和视频url、截图url的关系,用户可以通过s3来查看以往的视频,通过kvs查看直播

注意:ffmpeg-platform包比较大,生产时最好把ffmpeg-platform包进行分层

测试效果

可以在Aws的iot core进行模拟设备上报,也可以在Lambda详情页面配置好事件参数进行测试

这里演示Iot模拟设备上报进行测试]

1.提前上传好视频,拿到对应的时间戳,上传视频参考:使用Kinesis Video Streams上传视频 (hanjiang.work)

2.配置消息路由的规则

image-20230520181405437

3.使用页面的MQTT 测试客户端模拟设备上报

主题:TEST_MAC/TEST_MAC/user/event/trigger

负载:{"time":1685611621116,"data":{"triggerTime":1685611621116,"type":1}}

image-20230601174215779

4.查看s3上是否出现了视频和截图,出现说明执行成功