Skip to content

Package

Parsed Drift Package with data payload

Source code in drift_client/drift_data_package.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class DriftDataPackage:  # pylint: disable=no-member
    """Parsed Drift Package with data payload"""

    _blob: bytes
    _pkg: DriftPackage

    def __init__(self, blob: bytes):
        """Parsed Drift Package

        Args:
            blob: Serialized  package from database or stream
        """
        self._blob = blob
        pkg = DriftPackage()
        pkg.ParseFromString(blob)
        self._pkg = pkg

    TS_PRECISION = 1000

    @property
    def blob(self) -> bytes:
        """Serialized DriftPackage, can be passed to file write to save .dp file

        Returns:
            Serialized DriftPackage
        """
        return self._blob

    @property
    def package_id(self) -> int:
        """Package ID

        Returns:
            Package ID (by this all acquired data is synced)
        """
        return self._pkg.id

    @property
    def source_timestamp(self) -> float:
        """Source Timestamp

        Returns:
            Source timestamp (Timestamp when the service
                has received  the input package)
        """
        return self._pkg.source_timestamp.ToMilliseconds() / self.TS_PRECISION

    @property
    def publish_timestamp(self) -> float:
        """Publish Timestamp

        Returns:
            Publish timestamp (Timestamp when the service
                has done its job and sends the output package.)
        """
        return self._pkg.publish_timestamp.ToMilliseconds() / self.TS_PRECISION

    @property
    def status_code(self) -> int:
        """Status Code

        Returns:
            Status of the package. Ok (0) means the package is valid
        """
        return self._pkg.status

    @property
    def meta(self) -> Optional[MetaInfo]:
        """Meta information

        Returns:
            Meta information about the package
        """
        return self._pkg.meta

    @check_status
    def as_raw(self) -> Optional[bytes]:
        """Data payload as raw

        Returns:
            Data payload as raw. None if no payload in the package
        """
        data = None
        for proto_data in self._pkg.data:
            if proto_data.Is(DataPayload.DESCRIPTOR):
                payload = DataPayload()
                proto_data.Unpack(payload)
                data = payload.data

        return data

    @check_status
    def as_buffer(self) -> WaveletBuffer:
        """Data payload as Wavelet Buffer

        Returns:
            Data payload as Wavelet Buffer
        """
        return WaveletBuffer.parse(self.as_raw())

    @check_status
    def as_typed_data(self) -> Dict[str, Optional[Variant.SUPPORTED_TYPES]]:
        """Data payload as typed data"""
        if self.meta.type != MetaInfo.TYPED_DATA:
            raise ValueError("Only typed data supported")

        buffer = InputBuffer(self.as_raw())
        data = {}
        for item in self.meta.typed_data_info.items:
            value = buffer.pop()
            if item.status == StatusCode.GOOD:
                data[item.name] = value.value
            else:
                data[item.name] = None

        return data

    @check_status
    def as_np(self, scale_factor: int = 0) -> np.ndarray:
        """Data payload as NumPy Array

        Args:
            scale_factor: Wavelet composition factor, defaults to 0
        Returns:
            Data payload as NumPy Array
        """
        return self.as_buffer().compose(scale_factor)

    @property
    def labels(self) -> Dict[str, str]:
        """Labels as dict"""
        labels = {}
        for label in self._pkg.labels:
            labels[label.key] = label.value

        return labels

blob: bytes property

Serialized DriftPackage, can be passed to file write to save .dp file

Returns:

Type Description
bytes

Serialized DriftPackage

labels: Dict[str, str] property

Labels as dict

meta: Optional[MetaInfo] property

Meta information

Returns:

Type Description
Optional[MetaInfo]

Meta information about the package

package_id: int property

Package ID

Returns:

Type Description
int

Package ID (by this all acquired data is synced)

publish_timestamp: float property

Publish Timestamp

Returns:

Type Description
float

Publish timestamp (Timestamp when the service has done its job and sends the output package.)

source_timestamp: float property

Source Timestamp

Returns:

Type Description
float

Source timestamp (Timestamp when the service has received the input package)

status_code: int property

Status Code

Returns:

Type Description
int

Status of the package. Ok (0) means the package is valid

__init__(blob)

Parsed Drift Package

Parameters:

Name Type Description Default
blob bytes

Serialized package from database or stream

required
Source code in drift_client/drift_data_package.py
30
31
32
33
34
35
36
37
38
39
def __init__(self, blob: bytes):
    """Parsed Drift Package

    Args:
        blob: Serialized  package from database or stream
    """
    self._blob = blob
    pkg = DriftPackage()
    pkg.ParseFromString(blob)
    self._pkg = pkg

as_buffer()

Data payload as Wavelet Buffer

Returns:

Type Description
WaveletBuffer

Data payload as Wavelet Buffer

Source code in drift_client/drift_data_package.py
115
116
117
118
119
120
121
122
@check_status
def as_buffer(self) -> WaveletBuffer:
    """Data payload as Wavelet Buffer

    Returns:
        Data payload as Wavelet Buffer
    """
    return WaveletBuffer.parse(self.as_raw())

as_np(scale_factor=0)

Data payload as NumPy Array

Parameters:

Name Type Description Default
scale_factor int

Wavelet composition factor, defaults to 0

0

Returns: Data payload as NumPy Array

Source code in drift_client/drift_data_package.py
141
142
143
144
145
146
147
148
149
150
@check_status
def as_np(self, scale_factor: int = 0) -> np.ndarray:
    """Data payload as NumPy Array

    Args:
        scale_factor: Wavelet composition factor, defaults to 0
    Returns:
        Data payload as NumPy Array
    """
    return self.as_buffer().compose(scale_factor)

as_raw()

Data payload as raw

Returns:

Type Description
Optional[bytes]

Data payload as raw. None if no payload in the package

Source code in drift_client/drift_data_package.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@check_status
def as_raw(self) -> Optional[bytes]:
    """Data payload as raw

    Returns:
        Data payload as raw. None if no payload in the package
    """
    data = None
    for proto_data in self._pkg.data:
        if proto_data.Is(DataPayload.DESCRIPTOR):
            payload = DataPayload()
            proto_data.Unpack(payload)
            data = payload.data

    return data

as_typed_data()

Data payload as typed data

Source code in drift_client/drift_data_package.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@check_status
def as_typed_data(self) -> Dict[str, Optional[Variant.SUPPORTED_TYPES]]:
    """Data payload as typed data"""
    if self.meta.type != MetaInfo.TYPED_DATA:
        raise ValueError("Only typed data supported")

    buffer = InputBuffer(self.as_raw())
    data = {}
    for item in self.meta.typed_data_info.items:
        value = buffer.pop()
        if item.status == StatusCode.GOOD:
            data[item.name] = value.value
        else:
            data[item.name] = None

    return data